In [0]:
from pyspark.sql import functions as pysf

In [0]:
student_data_csv = "s3://msd-dev-team/praveen.k/pyspark_practice/student_data.csv"

Defining schema for DataFrame in pyspark

In [0]:
from  pyspark.sql import types as pyst
schema = pyst.StructType([
  pyst.StructField("age", pyst.IntegerType()),
  pyst.StructField("gender", pyst.StringType()),
  pyst.StructField("name", pyst.StringType()),
  pyst.StructField("course", pyst.StringType()),
  pyst.StructField("roll", pyst.StringType()),
  pyst.StructField("marks", pyst.IntegerType()),
  pyst.StructField("email", pyst.StringType())
])

In [0]:
df = spark.read.options(header=True).schema(schema).csv(student_data_csv)

In [0]:
df.show()
df.printSchema()

Rdd to DF in PySpark

In [0]:
rdd = sc.textFile(student_data_csv)

In [0]:
headers = rdd.first()
rdd = rdd.filter(lambda x: x != headers ).map(lambda x: x.split(','))

In [0]:
columns = headers.split(',')
df = rdd.toDF(columns)
df.show()

Converstion of RDD to DF by mentioning Schema

In [0]:
rddToDf = rdd.map(lambda x: [int(x[0]), x[1], x[2], x[3], x[4], int(x[5]), x[6]])
dfRdd = spark.createDataFrame(rddToDf, schema=schema)
dfRdd.printSchema()

In [0]:
display(dfRdd)

age,gender,name,course,roll,marks,email
28,Female,Hubert Oliveras,DB,2984,59,Annika Hoffman_Naoma Fritts@OOP.com
29,Female,Toshiko Hillyard,Cloud,12899,62,Margene Moores_Marylee Capasso@DB.com
28,Male,Celeste Lollis,PF,21267,45,Jeannetta Golden_Jenna Montague@DSA.com
29,Female,Elenore Choy,DB,32877,29,Billi Clore_Mitzi Seldon@DB.com
28,Male,Sheryll Towler,DSA,41487,41,Claude Panos_Judie Chipps@OOP.com
28,Male,Margene Moores,MVC,52771,32,Toshiko Hillyard_Clementina Menke@MVC.com
28,Male,Neda Briski,OOP,61973,69,Alberta Freund_Elenore Choy@DB.com
28,Female,Claude Panos,Cloud,72409,85,Sheryll Towler_Alberta Freund@Cloud.com
28,Male,Celeste Lollis,MVC,81492,64,Nicole Harwood_Claude Panos@MVC.com
29,Male,Cordie Harnois,OOP,92882,51,Judie Chipps_Clementina Menke@MVC.com


select statement in DF

In [0]:
df.select('name', 'age').show()

In [0]:
df.select(pysf.col('course'), pysf.col('roll')).show()

In [0]:
df.select(df.columns[4:]).show()

withColumn in DF

In [0]:
df.withColumn('section', pysf.lit('C')).limit(5).show()
df.withColumn('updatedMarks', pysf.col('marks') + 10).limit(5).show()

**withColumnRenamed and alias in DF**

In [0]:
df.withColumnRenamed('name', 'full name').limit(5).show()
df.select(pysf.col('course').alias('subject')).limit(5).show()

**filter in DF**
- normal filter
- contains
- like
- startswith
- endswith

In [0]:
df.filter(df.course == 'DB').limit(3).show()
df.filter(df.name.contains('ac')).limit(3).show()
df.filter(df.name.like('Er%')).limit(3).show()
df.filter(df.name.startswith('E') & df.course.endswith('B')).limit(3).show()

**disctinct, count, duplicate in DF**

In [0]:
df.count()

In [0]:
df.select('gender', 'name').distinct().count()

In [0]:
df.dropDuplicates(['name', 'age']).show()

**sort, orderBy DF**
- sort performs the operation at partion level so final output can't be perfect. [cheap operation]
- orderBy does the operation for entire DF. [costly operation]

In [0]:
df.sort('marks', 'age').limit(5).show()
df.sort(df.marks.asc(), df.age.desc()).limit(5).show()
df.orderBy(df.marks.asc(), df.age.desc()).limit(5).show()

d
 groupBy DF

In [0]:
df.groupBy('course').count().show()
df.groupBy('course').min('marks').show()
df.groupBy('course').max('marks').show()
df.groupBy('course').avg('marks').show()
df.groupBy('course').mean('marks').show()

d
 **groupBy DF**
 - Aggrigate function are applied to group of rows to from a single value

In [0]:
df.groupBy(
  'course', 
  'gender'
).agg(
  pysf.count('*'), 
  pysf.sum('marks'), 
  pysf.max('marks'), 
  pysf.min('marks')
).show()

df.filter(
  pysf.col('gender') == 'Male'
).groupBy(
  'course', 
  'gender'
).agg(
  pysf.count('*'), 
  pysf.sum('marks').alias('total_marks'), 
  pysf.max('marks'), 
  pysf.min('marks')
).filter(
  pysf.col('total_marks') > 4000
).show()