In [0]:
emp_df = spark.read.option("header",True)\
                    .option("inferSchema",True)\
                    .csv('/Volumes/sample_cat/default/volume_csv/emp_dataset/employees.csv')

In [0]:
emp_df.printSchema()

root
 |-- EMPLOYEE_ID: integer (nullable = true)
 |-- FIRST_NAME: string (nullable = true)
 |-- LAST_NAME: string (nullable = true)
 |-- EMAIL: string (nullable = true)
 |-- PHONE_NUMBER: string (nullable = true)
 |-- HIRE_DATE: string (nullable = true)
 |-- JOB_ID: string (nullable = true)
 |-- SALARY: integer (nullable = true)
 |-- COMMISSION_PCT: string (nullable = true)
 |-- MANAGER_ID: string (nullable = true)
 |-- DEPARTMENT_ID: integer (nullable = true)



In [0]:
emp_schema = emp_df.schema

In [0]:
emp_schema

StructType([StructField('EMPLOYEE_ID', IntegerType(), True), StructField('FIRST_NAME', StringType(), True), StructField('LAST_NAME', StringType(), True), StructField('EMAIL', StringType(), True), StructField('PHONE_NUMBER', StringType(), True), StructField('HIRE_DATE', StringType(), True), StructField('JOB_ID', StringType(), True), StructField('SALARY', IntegerType(), True), StructField('COMMISSION_PCT', StringType(), True), StructField('MANAGER_ID', StringType(), True), StructField('DEPARTMENT_ID', IntegerType(), True)])

In [0]:
emp_df.describe

<bound method DataFrame.describe of DataFrame[EMPLOYEE_ID: int, FIRST_NAME: string, LAST_NAME: string, EMAIL: string, PHONE_NUMBER: string, HIRE_DATE: string, JOB_ID: string, SALARY: int, COMMISSION_PCT: string, MANAGER_ID: string, DEPARTMENT_ID: int]>

In [0]:
cols = emp_df.columns

In [0]:
cols

['EMPLOYEE_ID',
 'FIRST_NAME',
 'LAST_NAME',
 'EMAIL',
 'PHONE_NUMBER',
 'HIRE_DATE',
 'JOB_ID',
 'SALARY',
 'COMMISSION_PCT',
 'MANAGER_ID',
 'DEPARTMENT_ID']

In [0]:
emp_df1 = emp_df.drop("COMMISSION_PCT")

In [0]:
emp_df1.printSchema()

root
 |-- EMPLOYEE_ID: integer (nullable = true)
 |-- FIRST_NAME: string (nullable = true)
 |-- LAST_NAME: string (nullable = true)
 |-- EMAIL: string (nullable = true)
 |-- PHONE_NUMBER: string (nullable = true)
 |-- HIRE_DATE: string (nullable = true)
 |-- JOB_ID: string (nullable = true)
 |-- SALARY: integer (nullable = true)
 |-- MANAGER_ID: string (nullable = true)
 |-- DEPARTMENT_ID: integer (nullable = true)



In [0]:
emp_df.display()

EMPLOYEE_ID,FIRST_NAME,LAST_NAME,EMAIL,PHONE_NUMBER,HIRE_DATE,JOB_ID,SALARY,COMMISSION_PCT,MANAGER_ID,DEPARTMENT_ID
198,Donald,OConnell,DOCONNEL,650.507.9833,21-JUN-07,SH_CLERK,2600,-,124,50
199,Douglas,Grant,DGRANT,650.507.9844,13-JAN-08,SH_CLERK,2600,-,124,50
200,Jennifer,Whalen,JWHALEN,515.123.4444,17-SEP-03,AD_ASST,4400,-,101,10
201,Michael,Hartstein,MHARTSTE,515.123.5555,17-FEB-04,MK_MAN,13000,-,100,20
202,Pat,Fay,PFAY,603.123.6666,17-AUG-05,MK_REP,6000,-,201,20
203,Susan,Mavris,SMAVRIS,515.123.7777,07-JUN-02,HR_REP,6500,-,101,40
204,Hermann,Baer,HBAER,515.123.8888,07-JUN-02,PR_REP,10000,-,101,70
205,Shelley,Higgins,SHIGGINS,515.123.8080,07-JUN-02,AC_MGR,12008,-,101,110
206,William,Gietz,WGIETZ,515.123.8181,07-JUN-02,AC_ACCOUNT,8300,-,205,110
100,Steven,King,SKING,515.123.4567,17-JUN-03,AD_PRES,24000,-,-,90


In [0]:
# how to rename a column in dataframe

# .withColumnRenamed()

emp_df2 = emp_df.withColumnRenamed("SALARY","EMP_SALARY")\
                .withColumnRenamed("FIRST_NAME","F_NAME")

In [0]:
emp_df.printSchema()

root
 |-- EMPLOYEE_ID: integer (nullable = true)
 |-- FIRST_NAME: string (nullable = true)
 |-- LAST_NAME: string (nullable = true)
 |-- EMAIL: string (nullable = true)
 |-- PHONE_NUMBER: string (nullable = true)
 |-- HIRE_DATE: string (nullable = true)
 |-- JOB_ID: string (nullable = true)
 |-- SALARY: integer (nullable = true)
 |-- COMMISSION_PCT: string (nullable = true)
 |-- MANAGER_ID: string (nullable = true)
 |-- DEPARTMENT_ID: integer (nullable = true)



In [0]:
emp_df2.printSchema()

root
 |-- EMPLOYEE_ID: integer (nullable = true)
 |-- FIRST_NAME: string (nullable = true)
 |-- LAST_NAME: string (nullable = true)
 |-- EMAIL: string (nullable = true)
 |-- PHONE_NUMBER: string (nullable = true)
 |-- HIRE_DATE: string (nullable = true)
 |-- JOB_ID: string (nullable = true)
 |-- EMP_SALARY: integer (nullable = true)
 |-- COMMISSION_PCT: string (nullable = true)
 |-- MANAGER_ID: string (nullable = true)
 |-- DEPARTMENT_ID: integer (nullable = true)



In [0]:
emp_df = emp_df.drop("SALARY")

In [0]:
emp_df.printSchema()

root
 |-- EMPLOYEE_ID: integer (nullable = true)
 |-- FIRST_NAME: string (nullable = true)
 |-- LAST_NAME: string (nullable = true)
 |-- EMAIL: string (nullable = true)
 |-- PHONE_NUMBER: string (nullable = true)
 |-- HIRE_DATE: string (nullable = true)
 |-- JOB_ID: string (nullable = true)
 |-- COMMISSION_PCT: string (nullable = true)
 |-- MANAGER_ID: string (nullable = true)
 |-- DEPARTMENT_ID: integer (nullable = true)



In [0]:
from pyspark.sql.functions import *

In [0]:
# alias 

emp_df.select(col("DEPARTMENT_ID").alias("D_ID")).display()

D_ID
50
50
10
20
20
40
70
110
110
90


In [0]:
# it will give us first row
emp_df.first()

Row(EMPLOYEE_ID=198, FIRST_NAME='Donald', LAST_NAME='OConnell', EMAIL='DOCONNEL', PHONE_NUMBER='650.507.9833', HIRE_DATE='21-JUN-07', JOB_ID='SH_CLERK', COMMISSION_PCT=' - ', MANAGER_ID='124', DEPARTMENT_ID=50)

In [0]:
# it will give us first n rows

emp_df.head(3)

[Row(EMPLOYEE_ID=198, FIRST_NAME='Donald', LAST_NAME='OConnell', EMAIL='DOCONNEL', PHONE_NUMBER='650.507.9833', HIRE_DATE='21-JUN-07', JOB_ID='SH_CLERK', COMMISSION_PCT=' - ', MANAGER_ID='124', DEPARTMENT_ID=50),
 Row(EMPLOYEE_ID=199, FIRST_NAME='Douglas', LAST_NAME='Grant', EMAIL='DGRANT', PHONE_NUMBER='650.507.9844', HIRE_DATE='13-JAN-08', JOB_ID='SH_CLERK', COMMISSION_PCT=' - ', MANAGER_ID='124', DEPARTMENT_ID=50),
 Row(EMPLOYEE_ID=200, FIRST_NAME='Jennifer', LAST_NAME='Whalen', EMAIL='JWHALEN', PHONE_NUMBER='515.123.4444', HIRE_DATE='17-SEP-03', JOB_ID='AD_ASST', COMMISSION_PCT=' - ', MANAGER_ID='101', DEPARTMENT_ID=10)]

In [0]:
# joins

1. left join   ---> matching records in both dataset + records reamin in left dataset
2. leftanti   ----> it returns records which are not matching in left side only
3. selfjoin   ----> matching records as per the condition
4. left semi  ---> inner join only but we will only matching records from left side 

In [0]:
dept_df = spark.read.option("header",True)\
                    .option("inferSchema",True)\
                    .csv("/Volumes/sample_cat/default/volume_csv/emp_dataset/departments.csv")

In [0]:
emp_df.printSchema()

root
 |-- EMPLOYEE_ID: integer (nullable = true)
 |-- FIRST_NAME: string (nullable = true)
 |-- LAST_NAME: string (nullable = true)
 |-- EMAIL: string (nullable = true)
 |-- PHONE_NUMBER: string (nullable = true)
 |-- HIRE_DATE: string (nullable = true)
 |-- JOB_ID: string (nullable = true)
 |-- SALARY: integer (nullable = true)
 |-- COMMISSION_PCT: string (nullable = true)
 |-- MANAGER_ID: string (nullable = true)
 |-- DEPARTMENT_ID: integer (nullable = true)



In [0]:
dept_df.printSchema()

root
 |-- DEPARTMENT_ID: integer (nullable = true)
 |-- DEPARTMENT_NAME: string (nullable = true)
 |-- MANAGER_ID: string (nullable = true)
 |-- LOCATION_ID: integer (nullable = true)



In [0]:
emp_df.join(dept_df, emp_df.DEPARTMENT_ID == dept_df.DEPARTMENT_ID ).display()

EMPLOYEE_ID,FIRST_NAME,LAST_NAME,EMAIL,PHONE_NUMBER,HIRE_DATE,JOB_ID,SALARY,COMMISSION_PCT,MANAGER_ID,DEPARTMENT_ID,DEPARTMENT_ID.1,DEPARTMENT_NAME,MANAGER_ID.1,LOCATION_ID
198,Donald,OConnell,DOCONNEL,650.507.9833,21-JUN-07,SH_CLERK,2600,-,124,50,50,Shipping,121,1500
199,Douglas,Grant,DGRANT,650.507.9844,13-JAN-08,SH_CLERK,2600,-,124,50,50,Shipping,121,1500
200,Jennifer,Whalen,JWHALEN,515.123.4444,17-SEP-03,AD_ASST,4400,-,101,10,10,Administration,200,1700
201,Michael,Hartstein,MHARTSTE,515.123.5555,17-FEB-04,MK_MAN,13000,-,100,20,20,Marketing,201,1800
202,Pat,Fay,PFAY,603.123.6666,17-AUG-05,MK_REP,6000,-,201,20,20,Marketing,201,1800
203,Susan,Mavris,SMAVRIS,515.123.7777,07-JUN-02,HR_REP,6500,-,101,40,40,Human Resources,203,2400
204,Hermann,Baer,HBAER,515.123.8888,07-JUN-02,PR_REP,10000,-,101,70,70,Public Relations,204,2700
205,Shelley,Higgins,SHIGGINS,515.123.8080,07-JUN-02,AC_MGR,12008,-,101,110,110,Accounting,205,1700
206,William,Gietz,WGIETZ,515.123.8181,07-JUN-02,AC_ACCOUNT,8300,-,205,110,110,Accounting,205,1700
100,Steven,King,SKING,515.123.4567,17-JUN-03,AD_PRES,24000,-,-,90,90,Executive,100,1700


In [0]:
emp_df.join(dept_df, emp_df.DEPARTMENT_ID == dept_df.DEPARTMENT_ID, how = 'leftanti' ).display()

EMPLOYEE_ID,FIRST_NAME,LAST_NAME,EMAIL,PHONE_NUMBER,HIRE_DATE,JOB_ID,SALARY,COMMISSION_PCT,MANAGER_ID,DEPARTMENT_ID


In [0]:
emp_df.join(dept_df, emp_df.DEPARTMENT_ID == dept_df.DEPARTMENT_ID, how = 'left_semi' ).display()

EMPLOYEE_ID,FIRST_NAME,LAST_NAME,EMAIL,PHONE_NUMBER,HIRE_DATE,JOB_ID,SALARY,COMMISSION_PCT,MANAGER_ID,DEPARTMENT_ID
198,Donald,OConnell,DOCONNEL,650.507.9833,21-JUN-07,SH_CLERK,2600,-,124,50
199,Douglas,Grant,DGRANT,650.507.9844,13-JAN-08,SH_CLERK,2600,-,124,50
200,Jennifer,Whalen,JWHALEN,515.123.4444,17-SEP-03,AD_ASST,4400,-,101,10
201,Michael,Hartstein,MHARTSTE,515.123.5555,17-FEB-04,MK_MAN,13000,-,100,20
202,Pat,Fay,PFAY,603.123.6666,17-AUG-05,MK_REP,6000,-,201,20
203,Susan,Mavris,SMAVRIS,515.123.7777,07-JUN-02,HR_REP,6500,-,101,40
204,Hermann,Baer,HBAER,515.123.8888,07-JUN-02,PR_REP,10000,-,101,70
205,Shelley,Higgins,SHIGGINS,515.123.8080,07-JUN-02,AC_MGR,12008,-,101,110
206,William,Gietz,WGIETZ,515.123.8181,07-JUN-02,AC_ACCOUNT,8300,-,205,110
100,Steven,King,SKING,515.123.4567,17-JUN-03,AD_PRES,24000,-,-,90


In [0]:
# union 
# union all ----> in pyspark both behave same, it keeps duplicates also

In [0]:
emp_df.unionAll(emp_df).display()

EMPLOYEE_ID,FIRST_NAME,LAST_NAME,EMAIL,PHONE_NUMBER,HIRE_DATE,JOB_ID,SALARY,COMMISSION_PCT,MANAGER_ID,DEPARTMENT_ID
198,Donald,OConnell,DOCONNEL,650.507.9833,21-JUN-07,SH_CLERK,2600,-,124,50
199,Douglas,Grant,DGRANT,650.507.9844,13-JAN-08,SH_CLERK,2600,-,124,50
200,Jennifer,Whalen,JWHALEN,515.123.4444,17-SEP-03,AD_ASST,4400,-,101,10
201,Michael,Hartstein,MHARTSTE,515.123.5555,17-FEB-04,MK_MAN,13000,-,100,20
202,Pat,Fay,PFAY,603.123.6666,17-AUG-05,MK_REP,6000,-,201,20
203,Susan,Mavris,SMAVRIS,515.123.7777,07-JUN-02,HR_REP,6500,-,101,40
204,Hermann,Baer,HBAER,515.123.8888,07-JUN-02,PR_REP,10000,-,101,70
205,Shelley,Higgins,SHIGGINS,515.123.8080,07-JUN-02,AC_MGR,12008,-,101,110
206,William,Gietz,WGIETZ,515.123.8181,07-JUN-02,AC_ACCOUNT,8300,-,205,110
100,Steven,King,SKING,515.123.4567,17-JUN-03,AD_PRES,24000,-,-,90


In [0]:
emp_df.union(emp_df).display()

EMPLOYEE_ID,FIRST_NAME,LAST_NAME,EMAIL,PHONE_NUMBER,HIRE_DATE,JOB_ID,SALARY,COMMISSION_PCT,MANAGER_ID,DEPARTMENT_ID
198,Donald,OConnell,DOCONNEL,650.507.9833,21-JUN-07,SH_CLERK,2600,-,124,50
199,Douglas,Grant,DGRANT,650.507.9844,13-JAN-08,SH_CLERK,2600,-,124,50
200,Jennifer,Whalen,JWHALEN,515.123.4444,17-SEP-03,AD_ASST,4400,-,101,10
201,Michael,Hartstein,MHARTSTE,515.123.5555,17-FEB-04,MK_MAN,13000,-,100,20
202,Pat,Fay,PFAY,603.123.6666,17-AUG-05,MK_REP,6000,-,201,20
203,Susan,Mavris,SMAVRIS,515.123.7777,07-JUN-02,HR_REP,6500,-,101,40
204,Hermann,Baer,HBAER,515.123.8888,07-JUN-02,PR_REP,10000,-,101,70
205,Shelley,Higgins,SHIGGINS,515.123.8080,07-JUN-02,AC_MGR,12008,-,101,110
206,William,Gietz,WGIETZ,515.123.8181,07-JUN-02,AC_ACCOUNT,8300,-,205,110
100,Steven,King,SKING,515.123.4567,17-JUN-03,AD_PRES,24000,-,-,90


In [0]:
# type cast in dataframes ===> changing datatype

In [0]:
emp_df.select(col("EMPLOYEE_ID").cast("String")).display()

EMPLOYEE_ID
198
199
200
201
202
203
204
205
206
100


In [0]:
emp table, dept table ----> sample_cat_name.same_schema_name.emp
                            sample_cat_name.same_schema_name.dept

In [0]:
catlog name ==> hcl_org
shcema name ==> landing_emp
tablename ---> emp, dept

under the same catlog  ===> curated_emp
table_name ==> emp_details   ( emp join dept on dept_id )

hcl_org.landing_emp.emp
hcl_org.landing_emp.dept

hcl_org.curated_emp.emp_details  ( emp_id, full_name ( concat first_name + last_name (I need camel case - 1st letter should be upper case)), salary, email, dept_id, dept_name)


In [0]:
Assa Dffd

In [0]:
sample_cat.flights.flights_data