###  1: DataFrames

In [1]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('jvt').getOrCreate()

In [2]:
df = spark.createDataFrame([(1, 4), (2, 5), (3, 6)], ["A", "B"])

In [3]:
df.count()

3

In [4]:
df.show()

+---+---+
|  A|  B|
+---+---+
|  1|  4|
|  2|  5|
|  3|  6|
+---+---+



In [5]:
#Returns Column Object
df.A

Column<b'A'>

In [None]:
df.select('A').show()

In [None]:
#Adding a new column
df.withColumn('C',df.A+1).show()

In [None]:
from pyspark.sql.functions import lit
df.withColumn('C',lit(5)).show()

In [None]:
df = df.withColumn('C',lit(5))

In [None]:
df.select('A',(df.A > 2).alias("State")).show()

In [None]:
df[(df.A > 2)].show()

### GroupBy

In [None]:
df = spark.createDataFrame([('a',33), ('b',11), ('a',22)],['names','age'])

In [None]:
gdf = df.groupBy(df.names)

In [None]:
gdf.agg({"*":"count"}).collect()

In [None]:
from pyspark.sql import functions as F
df = spark.createDataFrame([('a',33), ('b',11), ('a',22)],['names','age'])
gdf = df.groupBy(df.names)

sorted(gdf.agg(F.min(df.age)).collect())

In [None]:
g2df = df.groupBy(df.names)
g2df.min('age').collect()

### Generate your own DataFrame
Create `stringRDD` RDD and then convert it into a DataFrame when we're reading `stringJSONRDD` using `spark.read.json`.

In [None]:
# Generate our own JSON data 

stringJSONRDD = spark.sparkContext.parallelize((""" 
  { "id": "123",
    "name": "Katie",
    "age": 19,
    "eyeColor": "brown"
  }""",
   """{
    "id": "234",
    "name": "Michael",
    "age": 22,
    "eyeColor": "green"
  }""", 
  """{
    "id": "345",
    "name": "Simone",
    "age": 23,
    "eyeColor": "blue"
  }""")
)

In [None]:
# Create DataFrame
swimmersJSON = spark.read.json(stringJSONRDD)

In [None]:
# Create temporary table
swimmersJSON.createOrReplaceTempView("swimmersJSON")

In [None]:
# DataFrame API
swimmersJSON.show()

In [None]:
# SQL Query
spark.sql("select * from swimmersJSON").collect()

In [25]:
spark.sql("select * from swimmersJSON")

DataFrame[age: bigint, eyeColor: string, id: string, name: string]

#### Inferring the Schema Using Reflection
Note that Apache Spark is inferring the schema using reflection; i.e. it automaticlaly determines the schema of the data based on reviewing the JSON data.

In [26]:
# Print the schema
swimmersJSON.printSchema()

root
 |-- age: long (nullable = true)
 |-- eyeColor: string (nullable = true)
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)



Notice that Spark was able to determine infer the schema (when reviewing the schema using `.printSchema`).

But what if we want to programmatically specify the schema?

#### Programmatically Specifying the Schema
In this case, let's specify the schema for a `CSV` text file.

In [27]:
from pyspark.sql.types import *

stringCSVRDD = sc.parallelize([(123, 'Katie', 19, 'brown'), (234, 'Michael', 22, 'green'), (345, 'Simone', 23, 'blue')])

# The schema is encoded in a string, using StructType we define the schema using various pyspark.sql.types
schemaString = "id name age eyeColor"
schema = StructType([
    StructField("id", LongType(), True),    
    StructField("name", StringType(), True),
    StructField("age", LongType(), True),
    StructField("eyeColor", StringType(), True)
])

# Apply the schema to the RDD and Create DataFrame
swimmers = spark.createDataFrame(stringCSVRDD, schema)

# Creates a temporary view using the DataFrame
swimmers.createOrReplaceTempView("swimmers")

NameError: name 'sc' is not defined

In [None]:
# Print the schema
#   Notice that we have redefined id as Long (instead of String)
swimmers.printSchema()

In [None]:
spark.sql("select * from swimmers")

As you can see from above, we can programmatically apply the `schema` instead of allowing the Spark engine to infer the schema via reflection.

Additional Resources include:
* [PySpark API Reference](https://spark.apache.org/docs/2.0.0/api/python/pyspark.sql.html)
* [Spark SQL, DataFrames, and Datasets Guide](https://spark.apache.org/docs/latest/sql-programming-guide.html#programmatically-specifying-the-schema): This is in reference to Programmatically Specifying the Schema using a `CSV` file.

### Querying with dataframe

In [None]:
# Query id and age for swimmers with age = 22 via DataFrame API
swimmers.select("id", "age").filter("age = 22").show()

In [None]:
# Query id and age for swimmers with age = 22 via DataFrame API in another way
swimmers.select(swimmers.id, swimmers.age).filter(swimmers.age == 22).show()


In [None]:
# Query id and age for swimmers with age = 22 in SQL
spark.sql("select id, age from swimmers where age = 22").show()

In [None]:
spark.sql("select id, age from swimmers where age = 22")

In [None]:
# Query name and eye color for swimmers with eye color starting with the letter 'b'
spark.sql("select name, eyeColor from swimmers where eyeColor like 'b%'").show()

In [None]:
spark.sql("select name, eyeColor from swimmers where eyeColor like 'b%'")

### Querying with the DataFrame API
With DataFrames, you can start writing your queries using the DataFrame API

In [None]:
# Show the values 
swimmers.show()

In [None]:
# Get count of rows
swimmers.count()

In [None]:
# Get the id, age where age = 22
swimmers.select("id", "age").filter("age = 22").show()

In [None]:
# Get the name, eyeColor where eyeColor like 'b%'
swimmers.select("name", "eyeColor").filter("eyeColor like 'b%'").show()

### DataFrame Queries
* Understanding explode, selectExpr

In [None]:
# import pyspark class Row from module sql
from pyspark.sql import *

# Create Example Data - Departments and Employees

# Create the Departments
department1 = Row(id='123456', name='Computer Science')
department2 = Row(id='789012', name='Mechanical Engineering')
department3 = Row(id='345678', name='Theater and Drama')
department4 = Row(id='901234', name='Indoor Recreation')

# Create the Employees
Employee = Row("firstName", "lastName", "email", "salary")
employee1 = Employee('michael', 'armbrust', 'no-reply@berkeley.edu', 100000)
employee11 = Employee('michael', 'armbrust', 'no-reply@berkeley.edu', 200000)
employee2 = Employee('xiangrui', 'meng', 'no-reply@stanford.edu', 120000)
employee3 = Employee('matei', None, 'no-reply@waterloo.edu', 140000)
employee31 = Employee('matei', None, 'no-reply@waterloo.edu', 180000)
employee4 = Employee(None, 'wendell', 'no-reply@berkeley.edu', 160000)

# Create the DepartmentWithEmployees instances from Departments and Employees
departmentWithEmployees1 = Row(department=department1, employees=[employee1, employee2])
departmentWithEmployees2 = Row(department=department2, employees=[employee3, employee4, employee11])
departmentWithEmployees3 = Row(department=department3, employees=[employee1, employee4, employee31])
departmentWithEmployees4 = Row(department=department4, employees=[employee2, employee3])

In [None]:
departmentsWithEmployeesSeq1 = [departmentWithEmployees1, departmentWithEmployees2]
df1 = sqlContext.createDataFrame(departmentsWithEmployeesSeq1)

departmentsWithEmployeesSeq2 = [departmentWithEmployees3, departmentWithEmployees4]
df2 = sqlContext.createDataFrame(departmentsWithEmployeesSeq2)

In [None]:
df1.show()

In [None]:
df2.show()

In [None]:
unionDF = df1.unionAll(df2)

In [None]:
unionDF.show()

In [None]:
from pyspark.sql.functions import explode

df = unionDF.select("department",explode("employees").alias("e"))

In [None]:
df.show()

In [None]:
df.collect()

In [None]:
df.selectExpr("department.id","department.name","e.firstName", "e.lastName", "e.email", "e.salary").show()

In [None]:
from pyspark.sql.functions import explode

df = unionDF.select(explode("employees").alias("e"))

explodeDF = df.selectExpr("e.firstName", "e.lastName", "e.email", "e.salary")
explodeDF.show()

In [None]:
filterDF = explodeDF.filter( explodeDF.firstName == 'xiangrui').sort(explodeDF.salary)

In [None]:
filterDF.show()

In [None]:
from pyspark.sql.functions import col, asc, desc
filterDF = explodeDF.filter((col("firstName") == "xiangrui") | (col("firstName") == "michael")).sort(desc("lastName"))
filterDF.show()

In [None]:
whereDF = explodeDF.where((col("firstName") == "xiangrui") | (col("firstName") == "michael")).sort(asc("lastName"))
whereDF.show()

### Handling Missing Data

In [None]:
from pyspark.sql.functions import col, asc, desc
filterNonNullDF = explodeDF.filter(col("firstName").isNotNull()).filter(col("lastName").isNotNull()).sort("email")
filterNonNullDF.show()

In [28]:
from pyspark.sql.functions import countDistinct,count

countDistinctDF = explodeDF.select("firstName", "lastName")\
  .groupBy("firstName", "lastName")\
  .agg(countDistinct("firstName"))

countDistinctDF.show()

NameError: name 'explodeDF' is not defined

from pyspark.sql.functions import count

countDistinctDF = explodeDF.select("firstName", "lastName")\
  .groupBy("firstName", "lastName")\
  .agg(count("*"))
countDistinctDF.show()

In [29]:
explodeDF.describe("salary").show()

NameError: name 'explodeDF' is not defined

For more information, please refer to:
* [Spark SQL, DataFrames and Datasets Guide](http://spark.apache.org/docs/latest/sql-programming-guide.html#sql)
* [PySpark SQL Module: DataFrame](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame)
* [PySpark SQL Functions Module](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions)

### DropDuplicates

In [30]:
df = spark.createDataFrame([
        (1, 144.5, 5.9, 33, 'M'),
        (2, 167.2, 5.4, 45, 'M'),
        (3, 124.1, 5.2, 23, 'F'),
        (4, 144.5, 5.9, 33, 'M'),
        (5, 133.2, 5.7, 54, 'F'),
        (3, 124.1, 5.2, 23, 'F'),
        (5, 129.2, 5.3, 42, 'M'),
    ], ['id', 'weight', 'height', 'age', 'gender'])
df.show()

+---+------+------+---+------+
| id|weight|height|age|gender|
+---+------+------+---+------+
|  1| 144.5|   5.9| 33|     M|
|  2| 167.2|   5.4| 45|     M|
|  3| 124.1|   5.2| 23|     F|
|  4| 144.5|   5.9| 33|     M|
|  5| 133.2|   5.7| 54|     F|
|  3| 124.1|   5.2| 23|     F|
|  5| 129.2|   5.3| 42|     M|
+---+------+------+---+------+



In [31]:
df = df.dropDuplicates()
df.show()

+---+------+------+---+------+
| id|weight|height|age|gender|
+---+------+------+---+------+
|  5| 133.2|   5.7| 54|     F|
|  5| 129.2|   5.3| 42|     M|
|  1| 144.5|   5.9| 33|     M|
|  4| 144.5|   5.9| 33|     M|
|  2| 167.2|   5.4| 45|     M|
|  3| 124.1|   5.2| 23|     F|
+---+------+------+---+------+



In [32]:
df.count()

6

In [33]:
#Duplicates except for id column
df = df.dropDuplicates(subset=[c for c in df.columns if c != 'id'])

In [34]:
df.show()

+---+------+------+---+------+
| id|weight|height|age|gender|
+---+------+------+---+------+
|  5| 133.2|   5.7| 54|     F|
|  1| 144.5|   5.9| 33|     M|
|  2| 167.2|   5.4| 45|     M|
|  3| 124.1|   5.2| 23|     F|
|  5| 129.2|   5.3| 42|     M|
+---+------+------+---+------+



In [35]:
[c for c in df.columns if c != 'id']

['weight', 'height', 'age', 'gender']

### Aggregation

In [36]:

import pyspark.sql.functions as F
df.agg(
  F.count('id').alias('count'),
  F.countDistinct('id').alias('distinct')
).show()

+-----+--------+
|count|distinct|
+-----+--------+
|    5|       4|
+-----+--------+



### More on Handling Missing Data

In [37]:
df_miss = spark.createDataFrame([
        (1, 143.5, 5.6, 28,   'M',  100000),
        (2, 167.2, 5.4, 45,   'M',  None),
        (3, None , 5.2, None, None, None),
        (4, 144.5, 5.9, 33,   'M',  None),
        (5, 133.2, 5.7, 54,   'F',  None),
        (6, 124.1, 5.2, None, 'F',  None),
        (7, 129.2, 5.3, 42,   'M',  76000),
    ], ['id', 'weight', 'height', 'age', 'gender', 'income'])

In [38]:
df_miss.show()

+---+------+------+----+------+------+
| id|weight|height| age|gender|income|
+---+------+------+----+------+------+
|  1| 143.5|   5.6|  28|     M|100000|
|  2| 167.2|   5.4|  45|     M|  null|
|  3|  null|   5.2|null|  null|  null|
|  4| 144.5|   5.9|  33|     M|  null|
|  5| 133.2|   5.7|  54|     F|  null|
|  6| 124.1|   5.2|null|     F|  null|
|  7| 129.2|   5.3|  42|     M| 76000|
+---+------+------+----+------+------+



In [39]:
df_miss.printSchema()

root
 |-- id: long (nullable = true)
 |-- weight: double (nullable = true)
 |-- height: double (nullable = true)
 |-- age: long (nullable = true)
 |-- gender: string (nullable = true)
 |-- income: long (nullable = true)



In [40]:
df_miss.describe()

DataFrame[summary: string, id: string, weight: string, height: string, age: string, gender: string, income: string]

In [41]:
#Calculate missing columns for each row
df_miss.rdd.collect()

[Row(id=1, weight=143.5, height=5.6, age=28, gender='M', income=100000),
 Row(id=2, weight=167.2, height=5.4, age=45, gender='M', income=None),
 Row(id=3, weight=None, height=5.2, age=None, gender=None, income=None),
 Row(id=4, weight=144.5, height=5.9, age=33, gender='M', income=None),
 Row(id=5, weight=133.2, height=5.7, age=54, gender='F', income=None),
 Row(id=6, weight=124.1, height=5.2, age=None, gender='F', income=None),
 Row(id=7, weight=129.2, height=5.3, age=42, gender='M', income=76000)]

In [42]:
df_miss.rdd.map( lambda r: (r['id'], sum([c == None  for c in r]) )).collect()

[(1, 0), (2, 1), (3, 4), (4, 1), (5, 1), (6, 2), (7, 0)]

In [43]:
df_miss.where('id == 3').show()

+---+------+------+----+------+------+
| id|weight|height| age|gender|income|
+---+------+------+----+------+------+
|  3|  null|   5.2|null|  null|  null|
+---+------+------+----+------+------+



In [44]:
import pyspark.sql.functions as F
df_miss.agg(
 F.count('weight'), F.count('height'), F.count('age'),F.count('gender'),F.count('income'),
 F.count('*')
).show()

+-------------+-------------+----------+-------------+-------------+--------+
|count(weight)|count(height)|count(age)|count(gender)|count(income)|count(1)|
+-------------+-------------+----------+-------------+-------------+--------+
|            6|            7|         5|            6|            2|       7|
+-------------+-------------+----------+-------------+-------------+--------+



In [45]:
import pyspark.sql.functions as F
df_miss.agg(
 *[F.count(c)  for c in df.columns]
).show()

+---------+-------------+-------------+----------+-------------+
|count(id)|count(weight)|count(height)|count(age)|count(gender)|
+---------+-------------+-------------+----------+-------------+
|        7|            6|            7|         5|            6|
+---------+-------------+-------------+----------+-------------+



In [46]:
df_miss.dropna(thresh=3).show()

+---+------+------+----+------+------+
| id|weight|height| age|gender|income|
+---+------+------+----+------+------+
|  1| 143.5|   5.6|  28|     M|100000|
|  2| 167.2|   5.4|  45|     M|  null|
|  4| 144.5|   5.9|  33|     M|  null|
|  5| 133.2|   5.7|  54|     F|  null|
|  6| 124.1|   5.2|null|     F|  null|
|  7| 129.2|   5.3|  42|     M| 76000|
+---+------+------+----+------+------+

