### Creating DataFrames with Python

In [1]:
# import pyspark class Row from module sql
from pyspark.sql import *

# Create Example Data - Departments and Employees

# Create the Departments
department1 = Row(id='123456', name='Computer Science')
department2 = Row(id='789012', name='Mechanical Engineering')
department3 = Row(id='345678', name='Theater and Drama')
department4 = Row(id='901234', name='Indoor Recreation')

# Create the Employees
Employee = Row("firstName", "lastName", "email", "salary")
employee1 = Employee('michael', 'armbrust', 'no-reply@berkeley.edu', 100000)
employee2 = Employee('xiangrui', 'meng', 'no-reply@stanford.edu', 120000)
employee3 = Employee('matei', None, 'no-reply@waterloo.edu', 140000)
employee4 = Employee(None, 'wendell', 'no-reply@berkeley.edu', 160000)

# Create the DepartmentWithEmployees instances from Departments and Employees
departmentWithEmployees1 = Row(department=department1, employees=[employee1, employee2])
departmentWithEmployees2 = Row(department=department2, employees=[employee3, employee4])
departmentWithEmployees3 = Row(department=department3, employees=[employee1, employee4])
departmentWithEmployees4 = Row(department=department4, employees=[employee2, employee3])

print (department1)
print (employee2)
print (departmentWithEmployees1.employees[0].email)

Row(id='123456', name='Computer Science')
Row(firstName='xiangrui', lastName='meng', email='no-reply@stanford.edu', salary=120000)
no-reply@berkeley.edu


In [2]:
departmentsWithEmployeesSeq1 = [departmentWithEmployees1, departmentWithEmployees2]
df1 = sqlContext.createDataFrame(departmentsWithEmployeesSeq1)

print(df1)

DataFrame[department: struct<id:string,name:string>, employees: array<struct<firstName:string,lastName:string,email:string,salary:bigint>>]


In [3]:
departmentsWithEmployeesSeq2 = [departmentWithEmployees3, departmentWithEmployees4]
df2 = sqlContext.createDataFrame(departmentsWithEmployeesSeq2)

print(df2)

DataFrame[department: struct<id:string,name:string>, employees: array<struct<firstName:string,lastName:string,email:string,salary:bigint>>]


### Working with DataFrames

In [4]:
unionDF = df1.unionAll(df2)
print(unionDF)

DataFrame[department: struct<id:string,name:string>, employees: array<struct<firstName:string,lastName:string,email:string,salary:bigint>>]


In [5]:
!hadoop fs -rm -r "/tmp/databricks-df-example.parquet"
unionDF.write.parquet("/tmp/databricks-df-example.parquet")
parquetDF = sqlContext.read.parquet("/tmp/databricks-df-example.parquet")
print(parquetDF)

17/11/22 17:42:48 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
rm: `/tmp/databricks-df-example.parquet': No such file or directory
DataFrame[department: struct<id:string,name:string>, employees: array<struct<firstName:string,lastName:string,email:string,salary:bigint>>]


In [6]:
from pyspark.sql import Row
from pyspark.sql.functions import split, explode
eDF = sqlContext.createDataFrame([Row(a=1, intlist=[1,2,3], mapfield={"a": "b"})])
eDF.select(explode(eDF.intlist).alias("anInt")).collect()

[Row(anInt=1), Row(anInt=2), Row(anInt=3)]

In [7]:
eDF.select(explode(eDF.mapfield).alias("key", "value")).show()

+---+-----+
|key|value|
+---+-----+
|  a|    b|
+---+-----+



In [8]:
from pyspark.sql.functions import explode

df = parquetDF.select(explode("employees").alias("e"))
explodeDF = df.selectExpr("e.firstName", "e.lastName", "e.email", "e.salary")

explodeDF.show();

+---------+--------+--------------------+------+
|firstName|lastName|               email|salary|
+---------+--------+--------------------+------+
|  michael|armbrust|no-reply@berkeley...|100000|
|     null| wendell|no-reply@berkeley...|160000|
|  michael|armbrust|no-reply@berkeley...|100000|
| xiangrui|    meng|no-reply@stanford...|120000|
|    matei|    null|no-reply@waterloo...|140000|
|     null| wendell|no-reply@berkeley...|160000|
| xiangrui|    meng|no-reply@stanford...|120000|
|    matei|    null|no-reply@waterloo...|140000|
+---------+--------+--------------------+------+



In [9]:
filterDF = explodeDF.filter(explodeDF.firstName == "xiangrui").sort(explodeDF.lastName)
filterDF.show()

+---------+--------+--------------------+------+
|firstName|lastName|               email|salary|
+---------+--------+--------------------+------+
| xiangrui|    meng|no-reply@stanford...|120000|
| xiangrui|    meng|no-reply@stanford...|120000|
+---------+--------+--------------------+------+



In [10]:
from pyspark.sql.functions import col, asc

# Use `|` instead of `or`
filterDF = explodeDF.filter((col("firstName") == "xiangrui") | (col("firstName") == "michael")).sort(asc("lastName"))
filterDF.show()

+---------+--------+--------------------+------+
|firstName|lastName|               email|salary|
+---------+--------+--------------------+------+
|  michael|armbrust|no-reply@berkeley...|100000|
|  michael|armbrust|no-reply@berkeley...|100000|
| xiangrui|    meng|no-reply@stanford...|120000|
| xiangrui|    meng|no-reply@stanford...|120000|
+---------+--------+--------------------+------+



In [11]:
whereDF = explodeDF.where((col("firstName") == "xiangrui") | (col("firstName") == "michael")).sort(asc("lastName"))
whereDF.show()

+---------+--------+--------------------+------+
|firstName|lastName|               email|salary|
+---------+--------+--------------------+------+
|  michael|armbrust|no-reply@berkeley...|100000|
|  michael|armbrust|no-reply@berkeley...|100000|
| xiangrui|    meng|no-reply@stanford...|120000|
| xiangrui|    meng|no-reply@stanford...|120000|
+---------+--------+--------------------+------+



In [12]:
nonNullDF = explodeDF.fillna("--")
nonNullDF.show();

+---------+--------+--------------------+------+
|firstName|lastName|               email|salary|
+---------+--------+--------------------+------+
|  michael|armbrust|no-reply@berkeley...|100000|
|       --| wendell|no-reply@berkeley...|160000|
|  michael|armbrust|no-reply@berkeley...|100000|
| xiangrui|    meng|no-reply@stanford...|120000|
|    matei|      --|no-reply@waterloo...|140000|
|       --| wendell|no-reply@berkeley...|160000|
| xiangrui|    meng|no-reply@stanford...|120000|
|    matei|      --|no-reply@waterloo...|140000|
+---------+--------+--------------------+------+



In [13]:
filterNonNullDF = explodeDF.filter(col("firstName").isNull() | col("lastName").isNull()).sort("email")
filterNonNullDF.show()

+---------+--------+--------------------+------+
|firstName|lastName|               email|salary|
+---------+--------+--------------------+------+
|     null| wendell|no-reply@berkeley...|160000|
|     null| wendell|no-reply@berkeley...|160000|
|    matei|    null|no-reply@waterloo...|140000|
|    matei|    null|no-reply@waterloo...|140000|
+---------+--------+--------------------+------+



In [14]:
from pyspark.sql.functions import countDistinct

countDistinctDF = explodeDF.select("firstName", "lastName")\
    .groupBy("firstName", "lastName")\
    .agg(countDistinct("firstName"))
countDistinctDF.show()

+---------+--------+-------------------------+
|firstName|lastName|count(DISTINCT firstName)|
+---------+--------+-------------------------+
|     null| wendell|                        0|
|    matei|    null|                        1|
| xiangrui|    meng|                        1|
|  michael|armbrust|                        1|
+---------+--------+-------------------------+



In [15]:
countDistinctDF.explain()

== Physical Plan ==
*HashAggregate(keys=[firstName#58, lastName#59], functions=[count(distinct firstName#58)])
+- *HashAggregate(keys=[firstName#58, lastName#59], functions=[partial_count(distinct firstName#58)])
   +- *HashAggregate(keys=[firstName#58, lastName#59, firstName#58], functions=[])
      +- Exchange hashpartitioning(firstName#58, lastName#59, firstName#58, 200)
         +- *HashAggregate(keys=[firstName#58, lastName#59, firstName#58], functions=[])
            +- *Project [e#48.firstName AS firstName#58, e#48.lastName AS lastName#59]
               +- Generate explode(employees#18), false, false, [e#48]
                  +- *Scan parquet [employees#18] Format: ParquetFormat, InputPaths: hdfs://mna1:8020/tmp/databricks-df-example.parquet, PushedFilters: [], ReadSchema: struct<employees:array<struct<firstName:string,lastName:string,email:string,salary:bigint>>>


In [16]:
# register the DataFrame as a temp table so that we can query it using SQL
explodeDF.registerTempTable("databricks_df_example")

# Perform the same query as the DataFrame above and return ``explain``
countDistinctDF_sql = sqlContext.sql("SELECT firstName, lastName, count(distinct firstName) as distinct_first_names FROM databricks_df_example GROUP BY firstName, lastName")

countDistinctDF_sql.explain()

== Physical Plan ==
*HashAggregate(keys=[firstName#58, lastName#59], functions=[count(distinct firstName#58)])
+- *HashAggregate(keys=[firstName#58, lastName#59], functions=[partial_count(distinct firstName#58)])
   +- *HashAggregate(keys=[firstName#58, lastName#59, firstName#58], functions=[])
      +- Exchange hashpartitioning(firstName#58, lastName#59, firstName#58, 200)
         +- *HashAggregate(keys=[firstName#58, lastName#59, firstName#58], functions=[])
            +- *Project [e#48.firstName AS firstName#58, e#48.lastName AS lastName#59]
               +- Generate explode(employees#18), false, false, [e#48]
                  +- *Scan parquet [employees#18] Format: ParquetFormat, InputPaths: hdfs://mna1:8020/tmp/databricks-df-example.parquet, PushedFilters: [], ReadSchema: struct<employees:array<struct<firstName:string,lastName:string,email:string,salary:bigint>>>


In [17]:
salarySumDF = explodeDF.agg({"salary" : "sum"})
salarySumDF.show()

+-----------+
|sum(salary)|
+-----------+
|    1040000|
+-----------+



In [18]:
type(explodeDF.salary)

pyspark.sql.column.Column

In [19]:
explodeDF.describe("salary").show()

+-------+------------------+
|summary|            salary|
+-------+------------------+
|  count|                 8|
|   mean|          130000.0|
| stddev|23904.572186687874|
|    min|            100000|
|    max|            160000|
+-------+------------------+



### An example using Pandas & Matplotlib Integration

In [20]:
import pandas as pd
import matplotlib.pyplot as plt

plt.clf()
pdDF = nonNullDF.toPandas()
pdDF.plot(x='firstName', y='salary', kind='bar', rot=45)
pdDF

Unnamed: 0,firstName,lastName,email,salary
0,michael,armbrust,no-reply@berkeley.edu,100000
1,--,wendell,no-reply@berkeley.edu,160000
2,michael,armbrust,no-reply@berkeley.edu,100000
3,xiangrui,meng,no-reply@stanford.edu,120000
4,matei,--,no-reply@waterloo.edu,140000
5,--,wendell,no-reply@berkeley.edu,160000
6,xiangrui,meng,no-reply@stanford.edu,120000
7,matei,--,no-reply@waterloo.edu,140000


In [21]:
!hadoop fs -rm -r "/tmp/databricks-df-example.parquet"

17/11/22 17:42:59 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/11/22 17:42:59 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted /tmp/databricks-df-example.parquet


### DataFrame FAQs

In [22]:
!hadoop fs -rm -r "/tmp/dataframe_sample.csv"
!mkdir -p ~/data

!echo id,end_date,start_date,location >  ~/data/dataframe_sample.csv
!echo 1,2015-10-14 00:00:00,2015-09-14 00:00:00,CA-SF >>  ~/data/dataframe_sample.csv
!echo 2,2015-10-15 01:00:20,2015-08-14 00:00:00,CA-SD >>  ~/data/dataframe_sample.csv
!echo 3,2015-10-16 02:30:00,2015-01-14 00:00:00,NY-NY >>  ~/data/dataframe_sample.csv
!echo 4,2015-10-17 03:00:20,2015-02-14 00:00:00,NY-NY >>  ~/data/dataframe_sample.csv
!echo 5,2015-10-18 04:30:00,2014-04-14 00:00:00,CA-SD >>  ~/data/dataframe_sample.csv

!hadoop fs -put ~/data/dataframe_sample.csv /tmp/dataframe_sample.csv

from pyspark.sql import functions as F
from pyspark.sql.types import *

formatPackage = "csv"
df = sqlContext.read.format(formatPackage).options(header='true', delimiter = ',').load("/tmp/dataframe_sample.csv")

df.printSchema()
df.show()

17/11/22 17:43:01 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/11/22 17:43:01 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted /tmp/dataframe_sample.csv
17/11/22 17:43:03 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
root
 |-- id: string (nullable = true)
 |-- end_date: string (nullable = true)
 |-- start_date: string (nullable = true)
 |-- location: string (nullable = true)

+---+-------------------+-------------------+--------+
| id|           end_date|         start_date|location|
+---+-------------------+-------------------+--------+
|  1|2015-10-14 00:00:00|2015-09-14 00:00:00|   CA-SF|
|  2|2015-10-15 01:00:20|2015-08-14 00:00:00|   CA-SD|
|  3|2015-10-16 02:30:00|2015-01-14 00:00:00|   NY-NY|
|  4|2015-10-17 03:00:20|2015-02-14 00:00

In [23]:
# Instead of registering a UDF, call the builtin functions to perform operations on the columns.
# This will provide a performance improvement as the builtins compile and run in the platform's JVM.

# Convert to a Date type
df = df.withColumn('date', F.to_date(df.end_date))
df.show()

# Parse out the date only
df = df.withColumn('date_only', F.regexp_replace(df.end_date,' (\d+)[:](\d+)[:](\d+).*$', ''))
df.show()

# Split a string and index a field
df = df.withColumn('city', F.split(df.location, '-')[1])
df.show()

# Perform a date diff function
df = df.withColumn('date_diff', F.datediff(F.to_date(df.end_date), F.to_date(df.start_date)))
df.show()

+---+-------------------+-------------------+--------+----------+
| id|           end_date|         start_date|location|      date|
+---+-------------------+-------------------+--------+----------+
|  1|2015-10-14 00:00:00|2015-09-14 00:00:00|   CA-SF|2015-10-14|
|  2|2015-10-15 01:00:20|2015-08-14 00:00:00|   CA-SD|2015-10-15|
|  3|2015-10-16 02:30:00|2015-01-14 00:00:00|   NY-NY|2015-10-16|
|  4|2015-10-17 03:00:20|2015-02-14 00:00:00|   NY-NY|2015-10-17|
|  5|2015-10-18 04:30:00|2014-04-14 00:00:00|   CA-SD|2015-10-18|
+---+-------------------+-------------------+--------+----------+

+---+-------------------+-------------------+--------+----------+----------+
| id|           end_date|         start_date|location|      date| date_only|
+---+-------------------+-------------------+--------+----------+----------+
|  1|2015-10-14 00:00:00|2015-09-14 00:00:00|   CA-SF|2015-10-14|2015-10-14|
|  2|2015-10-15 01:00:20|2015-08-14 00:00:00|   CA-SD|2015-10-15|2015-10-15|
|  3|2015-10-16 02:3

In [24]:
df.registerTempTable("sample_df")
spark.sql("select * from sample_df")

DataFrame[id: string, end_date: string, start_date: string, location: string, date: date, date_only: string, city: string, date_diff: int]

In [25]:
rdd_json = df.toJSON()
rdd_json.take(2)

['{"id":"1","end_date":"2015-10-14 00:00:00","start_date":"2015-09-14 00:00:00","location":"CA-SF","date":"2015-10-14","date_only":"2015-10-14","city":"SF","date_diff":30}',
 '{"id":"2","end_date":"2015-10-15 01:00:20","start_date":"2015-08-14 00:00:00","location":"CA-SD","date":"2015-10-15","date_only":"2015-10-15","city":"SD","date_diff":62}']

In [26]:
from pyspark.sql import functions as F

add_n = F.udf(lambda x, y: x + y, IntegerType())

# We register a UDF that adds a column to the DataFrame, and we cast the id column to an Integer type.
df = df.withColumn('id_offset', add_n(F.lit(1000), df.id.cast(IntegerType())))
df.show()

+---+-------------------+-------------------+--------+----------+----------+----+---------+---------+
| id|           end_date|         start_date|location|      date| date_only|city|date_diff|id_offset|
+---+-------------------+-------------------+--------+----------+----------+----+---------+---------+
|  1|2015-10-14 00:00:00|2015-09-14 00:00:00|   CA-SF|2015-10-14|2015-10-14|  SF|       30|     1001|
|  2|2015-10-15 01:00:20|2015-08-14 00:00:00|   CA-SD|2015-10-15|2015-10-15|  SD|       62|     1002|
|  3|2015-10-16 02:30:00|2015-01-14 00:00:00|   NY-NY|2015-10-16|2015-10-16|  NY|      275|     1003|
|  4|2015-10-17 03:00:20|2015-02-14 00:00:00|   NY-NY|2015-10-17|2015-10-17|  NY|      245|     1004|
|  5|2015-10-18 04:30:00|2014-04-14 00:00:00|   CA-SD|2015-10-18|2015-10-18|  SD|      552|     1005|
+---+-------------------+-------------------+--------+----------+----------+----+---------+---------+



In [27]:
# any constants used by UDF will automatically pass through to workers
N = 90
last_n_days = F.udf(lambda x: x < N, BooleanType())

df_filtered = df.filter(last_n_days(df.date_diff))
df_filtered.show()

+---+-------------------+-------------------+--------+----------+----------+----+---------+---------+
| id|           end_date|         start_date|location|      date| date_only|city|date_diff|id_offset|
+---+-------------------+-------------------+--------+----------+----------+----+---------+---------+
|  1|2015-10-14 00:00:00|2015-09-14 00:00:00|   CA-SF|2015-10-14|2015-10-14|  SF|       30|     1001|
|  2|2015-10-15 01:00:20|2015-08-14 00:00:00|   CA-SD|2015-10-15|2015-10-15|  SD|       62|     1002|
+---+-------------------+-------------------+--------+----------+----------+----+---------+---------+



In [28]:
#df_1 = df.createOrReplaceTempView("sample_df")
#df_1.show()
df_2 = spark.sql("select * from sample_df")
df_2.show()

+---+-------------------+-------------------+--------+----------+----------+----+---------+
| id|           end_date|         start_date|location|      date| date_only|city|date_diff|
+---+-------------------+-------------------+--------+----------+----------+----+---------+
|  1|2015-10-14 00:00:00|2015-09-14 00:00:00|   CA-SF|2015-10-14|2015-10-14|  SF|       30|
|  2|2015-10-15 01:00:20|2015-08-14 00:00:00|   CA-SD|2015-10-15|2015-10-15|  SD|       62|
|  3|2015-10-16 02:30:00|2015-01-14 00:00:00|   NY-NY|2015-10-16|2015-10-16|  NY|      275|
|  4|2015-10-17 03:00:20|2015-02-14 00:00:00|   NY-NY|2015-10-17|2015-10-17|  NY|      245|
|  5|2015-10-18 04:30:00|2014-04-14 00:00:00|   CA-SD|2015-10-18|2015-10-18|  SD|      552|
+---+-------------------+-------------------+--------+----------+----------+----+---------+



In [29]:
sqlContext.clearCache()
sqlContext.cacheTable("sample_df")
sqlContext.uncacheTable("sample_df")

In [30]:
agg_df = df.groupBy("location").agg(F.min("id"), F.count("id"), F.avg("date_diff"))
agg_df.show()

+--------+-------+---------+--------------+
|location|min(id)|count(id)|avg(date_diff)|
+--------+-------+---------+--------------+
|   NY-NY|      3|        2|         260.0|
|   CA-SF|      1|        1|          30.0|
|   CA-SD|      2|        2|         307.0|
+--------+-------+---------+--------------+



In [31]:
!hadoop fs -rm -r "/tmp/sample_table"
df = df.withColumn('end_month', F.month('end_date'))
df = df.withColumn('end_year', F.year('end_date'))
df.write.partitionBy("end_year", "end_month").parquet("/tmp/sample_table")
!hadoop fs -ls "/tmp/sample_table"
#display(dbutils.fs.ls("/tmp/sample_table"))

17/11/22 17:43:08 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/11/22 17:43:09 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted /tmp/sample_table
17/11/22 17:43:11 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 2 items
-rw-r--r--   2 hadoop supergroup          0 2017-11-22 17:43 /tmp/sample_table/_SUCCESS
drwxr-xr-x   - hadoop supergroup          0 2017-11-22 17:43 /tmp/sample_table/end_year=2015


In [32]:
null_item_schema = StructType([StructField("col1", StringType(), True),
                               StructField("col2", IntegerType(), True)])
null_df = sqlContext.createDataFrame([("test", 1), (None, 2)], null_item_schema)
null_df.filter("col1 IS NOT NULL").show()

+----+----+
|col1|col2|
+----+----+
|test|   1|
+----+----+

