### In this noteobook we will learn about basics of DataFrame

In [1]:
# Create a Spark Session

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark_sess = SparkSession.builder.appName('SparkBasics').getOrCreate()

#What are SparkSession Config Options
----------------------------------------------------------
https://stackoverflow.com/questions/43024766/what-are-sparksession-config-options

http://spark.apache.org/docs/latest/configuration.html


In [4]:
'''
To get all the "various Spark parameters as key-value pairs" for a SparkSession, “The entry point to programming Spark with 
the Dataset and DataFrame API," run the following (this is using spark python api, scala would be very similar). 
'''

spark_sess.sparkContext.getConf().getAll()


[('spark.app.id', 'local-1588689623766'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.driver.host', '192.168.0.16'),
 ('spark.master', 'local[*]'),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.app.name', 'SparkBasics'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.driver.port', '52912')]

In [5]:
spark_sess.sparkContext._conf.getAll()

[('spark.app.id', 'local-1588689623766'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.driver.host', '192.168.0.16'),
 ('spark.master', 'local[*]'),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.app.name', 'SparkBasics'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.driver.port', '52912')]

In [6]:
spark_sess

In [7]:
#help(SparkSession)

###### CREATE DATAFRAME

In [8]:
# Read a json file
df = spark_sess.read.json('rdu_weather_history.json')

In [9]:
type(df)

pyspark.sql.dataframe.DataFrame

In [10]:
#Show DataFrame
df.show(5)

+------------+-----------+----------+-------+----+------------------+--------------------+------------------+--------------------+---+---------+--------+-----------+------------+-----+----+--------+---+----+-------------+----+---------+----+---------+--------+--------------+--------------+-------+
|avgwindspeed|blowingsnow|      date|drizzle|dust|fastest2minwinddir|fastest2minwindspeed|fastest5secwinddir|fastest5secwindspeed|fog|fogground|fogheavy|freezingfog|freezingrain|glaze|hail|highwind|ice|mist|precipitation|rain|smokehaze|snow|snowdepth|snowfall|temperaturemax|temperaturemin|thunder|
+------------+-----------+----------+-------+----+------------------+--------------------+------------------+--------------------+---+---------+--------+-----------+------------+-----+----+--------+---+----+-------------+----+---------+----+---------+--------+--------------+--------------+-------+
|        8.05|         No|2007-01-06|     No|  No|               230|                17.9|             

In [11]:
#Show Schema of DataFrame
df.printSchema()

root
 |-- avgwindspeed: double (nullable = true)
 |-- blowingsnow: string (nullable = true)
 |-- date: string (nullable = true)
 |-- drizzle: string (nullable = true)
 |-- dust: string (nullable = true)
 |-- fastest2minwinddir: long (nullable = true)
 |-- fastest2minwindspeed: double (nullable = true)
 |-- fastest5secwinddir: long (nullable = true)
 |-- fastest5secwindspeed: double (nullable = true)
 |-- fog: string (nullable = true)
 |-- fogground: string (nullable = true)
 |-- fogheavy: string (nullable = true)
 |-- freezingfog: string (nullable = true)
 |-- freezingrain: string (nullable = true)
 |-- glaze: string (nullable = true)
 |-- hail: string (nullable = true)
 |-- highwind: string (nullable = true)
 |-- ice: string (nullable = true)
 |-- mist: string (nullable = true)
 |-- precipitation: double (nullable = true)
 |-- rain: string (nullable = true)
 |-- smokehaze: string (nullable = true)
 |-- snow: string (nullable = true)
 |-- snowdepth: double (nullable = true)
 |-- snowfa

In [12]:
#Get Column Names
df.columns

['avgwindspeed',
 'blowingsnow',
 'date',
 'drizzle',
 'dust',
 'fastest2minwinddir',
 'fastest2minwindspeed',
 'fastest5secwinddir',
 'fastest5secwindspeed',
 'fog',
 'fogground',
 'fogheavy',
 'freezingfog',
 'freezingrain',
 'glaze',
 'hail',
 'highwind',
 'ice',
 'mist',
 'precipitation',
 'rain',
 'smokehaze',
 'snow',
 'snowdepth',
 'snowfall',
 'temperaturemax',
 'temperaturemin',
 'thunder']

In [13]:
#Get Statistical Summary of the numerical columns of the DataFrame
df.describe().show()

+-------+------------------+-----------+----------+-------+----+------------------+--------------------+------------------+--------------------+----+---------+--------+-----------+------------+-----+----+--------+----+----+-------------------+----+---------+----+-------------------+--------------------+------------------+------------------+-------+
|summary|      avgwindspeed|blowingsnow|      date|drizzle|dust|fastest2minwinddir|fastest2minwindspeed|fastest5secwinddir|fastest5secwindspeed| fog|fogground|fogheavy|freezingfog|freezingrain|glaze|hail|highwind| ice|mist|      precipitation|rain|smokehaze|snow|          snowdepth|            snowfall|    temperaturemax|    temperaturemin|thunder|
+-------+------------------+-----------+----------+-------+----+------------------+--------------------+------------------+--------------------+----+---------+--------+-----------+------------+-----+----+--------+----+----+-------------------+----+---------+----+-------------------+---------------

In [14]:
df.describe(['avgwindspeed','fastest2minwinddir','precipitation','snowfall','snowdepth']).show()

+-------+------------------+------------------+-------------------+--------------------+-------------------+
|summary|      avgwindspeed|fastest2minwinddir|      precipitation|            snowfall|          snowdepth|
+-------+------------------+------------------+-------------------+--------------------+-------------------+
|  count|              4868|              4869|               4870|                4870|               4870|
|   mean|5.8825061626951545|172.90819470117066|0.12882956878850058|0.013353182751540042|0.01670431211498974|
| stddev|2.9389954879312006| 94.04803485513821|  0.367784307000846| 0.21083299787643506| 0.2075011496153487|
|    min|               0.0|                10|                0.0|                 0.0|                0.0|
|    max|             20.36|               360|               6.45|                7.01|               5.91|
+-------+------------------+------------------+-------------------+--------------------+-------------------+



In [None]:
data = df.select("data").collect()[0]['data']

## Writing SQL Queries on DataFrame

##### Temporary View

In [15]:
#Register DataFrame as a SQL Temporary View

df.createOrReplaceTempView('weather')

In [16]:
#The sql function on a SparkSession enables applications to run SQL queries programmatically and 
#returns the result as a DataFrame.

res = spark_sess.sql("SELECT date,rain,snow,ice,fog,temperaturemax,temperaturemin FROM weather")

In [17]:
res

DataFrame[date: string, rain: string, snow: string, ice: string, fog: string, temperaturemax: double, temperaturemin: double]

In [18]:
type(res)

pyspark.sql.dataframe.DataFrame

In [19]:
res.show(5)

+----------+----+----+---+---+--------------+--------------+
|      date|rain|snow|ice|fog|temperaturemax|temperaturemin|
+----------+----+----+---+---+--------------+--------------+
|2007-01-06| Yes|  No| No|Yes|          71.1|          50.0|
|2007-01-09| Yes|  No| No| No|          55.0|          30.0|
|2007-01-14|  No|  No| No| No|          73.9|          50.0|
|2007-01-15|  No|  No| No| No|          73.9|          57.0|
|2007-01-20|  No|  No| No| No|          48.0|          26.1|
+----------+----+----+---+---+--------------+--------------+
only showing top 5 rows



In [20]:
res1 = spark_sess.sql("SELECT date,rain,snow,ice,fog,temperaturemax,temperaturemin FROM weather WHERE temperaturemax > 95 ORDER BY 1")

In [21]:
res1.show(5)

+----------+----+----+---+---+--------------+--------------+
|      date|rain|snow|ice|fog|temperaturemax|temperaturemin|
+----------+----+----+---+---+--------------+--------------+
|2007-06-08|  No|  No| No|Yes|          96.1|          73.0|
|2007-06-18|  No|  No| No|Yes|          96.1|          63.0|
|2007-06-19| Yes|  No| No|Yes|          97.0|          70.0|
|2007-07-09|  No|  No| No|Yes|          98.1|          71.1|
|2007-07-17| Yes|  No| No|Yes|          96.1|          70.0|
+----------+----+----+---+---+--------------+--------------+
only showing top 5 rows



##### Global Temporary View

In [22]:
'''
Temporary views in Spark SQL are session-scoped and will disappear if the session, that creates it, terminates.
If you want to have a temporary view that is shared among all sessions and keep alive until the Spark application terminates, 
you can create a global temporary view. Global temporary view is tied to a system preserved database global_temp, and 
we must use the qualified name to refer it, e.g. SELECT * FROM global_temp.view1.
'''

df.createOrReplaceGlobalTempView('weather_global_view')

In [23]:
res2 = spark_sess.sql('SELECT date,rain,snow,ice,fog,temperaturemax,temperaturemin FROM global_temp.weather_global_view')

In [24]:
type(res2)

pyspark.sql.dataframe.DataFrame

In [25]:
res2.show(5)

+----------+----+----+---+---+--------------+--------------+
|      date|rain|snow|ice|fog|temperaturemax|temperaturemin|
+----------+----+----+---+---+--------------+--------------+
|2007-01-06| Yes|  No| No|Yes|          71.1|          50.0|
|2007-01-09| Yes|  No| No| No|          55.0|          30.0|
|2007-01-14|  No|  No| No| No|          73.9|          50.0|
|2007-01-15|  No|  No| No| No|          73.9|          57.0|
|2007-01-20|  No|  No| No| No|          48.0|          26.1|
+----------+----+----+---+---+--------------+--------------+
only showing top 5 rows



In [26]:
spark_sess.sql('SELECT date,rain,snow,ice,fog,temperaturemax,temperaturemin FROM global_temp.weather_global_view').show(5)

+----------+----+----+---+---+--------------+--------------+
|      date|rain|snow|ice|fog|temperaturemax|temperaturemin|
+----------+----+----+---+---+--------------+--------------+
|2007-01-06| Yes|  No| No|Yes|          71.1|          50.0|
|2007-01-09| Yes|  No| No| No|          55.0|          30.0|
|2007-01-14|  No|  No| No| No|          73.9|          50.0|
|2007-01-15|  No|  No| No| No|          73.9|          57.0|
|2007-01-20|  No|  No| No| No|          48.0|          26.1|
+----------+----+----+---+---+--------------+--------------+
only showing top 5 rows



## Interoperating with RDDs -- Schema Inference

Spark SQL supports two different methods for converting existing RDDs into Datasets. 

1. The first method uses "reflection to infer the schema" of an RDD that contains specific types of objects.
This reflection-based approach leads to more concise code and works well when you already know the schema while writing your Spark application.

2. The second method for creating Datasets is through a "programmatic interface" that allows you to construct a schema and 
then apply it to an existing RDD. While this method is more verbose, it allows you to construct Datasets when the columns and 
their types are not known until runtime.

###### Inferring the Schema Using Reflection

In [27]:
sc = spark_sess.sparkContext

In [28]:
weather_rdd = sc.textFile('rdu_weather_history.csv')

In [29]:
type(weather_rdd)

pyspark.rdd.RDD

In [30]:
#See data in RDD
weather_rdd.collect()

['date;temperaturemin;temperaturemax;precipitation;snowfall;snowdepth;avgwindspeed;fastest2minwinddir;fastest2minwindspeed;fastest5secwinddir;fastest5secwindspeed;fog;fogheavy;mist;rain;fogground;ice;glaze;drizzle;snow;freezingrain;smokehaze;thunder;highwind;hail;blowingsnow;dust;freezingfog',
 '2007-01-06;50.0;71.1;0.13;0.0;0.0;8.05;230;17.9;230;21.92;Yes;No;Yes;Yes;No;No;No;No;No;No;No;No;No;No;No;No;No',
 '2007-01-09;30.0;55.0;0.0;0.0;0.0;7.61;280;23.04;270;29.08;No;No;No;Yes;No;No;No;No;No;No;No;No;No;No;No;No;No',
 '2007-01-14;50.0;73.9;0.0;0.0;0.0;8.5;230;21.03;230;25.05;No;No;No;No;No;No;No;No;No;No;No;No;No;No;No;No;No',
 '2007-01-15;57.0;73.9;0.0;0.0;0.0;13.2;230;23.94;230;29.08;No;No;No;No;No;No;No;No;No;No;No;No;No;No;No;No;No',
 '2007-01-20;26.1;48.0;0.0;0.0;0.0;4.92;290;16.11;320;21.03;No;No;No;No;No;No;No;No;No;No;No;No;No;No;No;No;No',
 '2007-01-22;33.1;41.0;0.08;0.0;0.0;2.01;230;8.05;10;12.08;Yes;No;Yes;Yes;No;No;No;Yes;No;No;No;No;No;No;No;No;No',
 '2007-01-24;30.0;48.

In [31]:
weather_rdd1 = weather_rdd.map(lambda l : l.split(';'))

In [32]:
type(weather_rdd1)

pyspark.rdd.PipelinedRDD

In [33]:
weather_rdd1.collect()

[['date',
  'temperaturemin',
  'temperaturemax',
  'precipitation',
  'snowfall',
  'snowdepth',
  'avgwindspeed',
  'fastest2minwinddir',
  'fastest2minwindspeed',
  'fastest5secwinddir',
  'fastest5secwindspeed',
  'fog',
  'fogheavy',
  'mist',
  'rain',
  'fogground',
  'ice',
  'glaze',
  'drizzle',
  'snow',
  'freezingrain',
  'smokehaze',
  'thunder',
  'highwind',
  'hail',
  'blowingsnow',
  'dust',
  'freezingfog'],
 ['2007-01-06',
  '50.0',
  '71.1',
  '0.13',
  '0.0',
  '0.0',
  '8.05',
  '230',
  '17.9',
  '230',
  '21.92',
  'Yes',
  'No',
  'Yes',
  'Yes',
  'No',
  'No',
  'No',
  'No',
  'No',
  'No',
  'No',
  'No',
  'No',
  'No',
  'No',
  'No',
  'No'],
 ['2007-01-09',
  '30.0',
  '55.0',
  '0.0',
  '0.0',
  '0.0',
  '7.61',
  '280',
  '23.04',
  '270',
  '29.08',
  'No',
  'No',
  'No',
  'Yes',
  'No',
  'No',
  'No',
  'No',
  'No',
  'No',
  'No',
  'No',
  'No',
  'No',
  'No',
  'No',
  'No'],
 ['2007-01-14',
  '50.0',
  '73.9',
  '0.0',
  '0.0',
  '0.0',
 

In [34]:
from pyspark.sql import Row

weather_rows = weather_rdd1.map(lambda p: Row(date = p[0], temperaturemax = p[2] ))

In [35]:
type(weather_rows)

pyspark.rdd.PipelinedRDD

In [36]:
weather_rows.collect()

[Row(date='date', temperaturemax='temperaturemax'),
 Row(date='2007-01-06', temperaturemax='71.1'),
 Row(date='2007-01-09', temperaturemax='55.0'),
 Row(date='2007-01-14', temperaturemax='73.9'),
 Row(date='2007-01-15', temperaturemax='73.9'),
 Row(date='2007-01-20', temperaturemax='48.0'),
 Row(date='2007-01-22', temperaturemax='41.0'),
 Row(date='2007-01-24', temperaturemax='48.9'),
 Row(date='2007-01-27', temperaturemax='64.0'),
 Row(date='2007-01-30', temperaturemax='48.9'),
 Row(date='2007-02-08', temperaturemax='48.0'),
 Row(date='2007-02-12', temperaturemax='64.0'),
 Row(date='2007-02-13', temperaturemax='50.0'),
 Row(date='2007-02-17', temperaturemax='48.0'),
 Row(date='2007-02-21', temperaturemax='73.0'),
 Row(date='2007-02-23', temperaturemax='55.0'),
 Row(date='2007-02-28', temperaturemax='61.0'),
 Row(date='2007-03-01', temperaturemax='66.9'),
 Row(date='2007-03-11', temperaturemax='70.0'),
 Row(date='2007-03-15', temperaturemax='81.0'),
 Row(date='2007-03-17', temperaturem

In [37]:
weather_df = spark_sess.createDataFrame(weather_rows)

In [38]:
type(weather_df)

pyspark.sql.dataframe.DataFrame

In [39]:
weather_df.show(5)

+----------+--------------+
|      date|temperaturemax|
+----------+--------------+
|      date|temperaturemax|
|2007-01-06|          71.1|
|2007-01-09|          55.0|
|2007-01-14|          73.9|
|2007-01-15|          73.9|
+----------+--------------+
only showing top 5 rows



##### Programmatically Specifying the Schema

In [40]:
#Didn't run it since it's already ran in previous cell

sc = spark_sess.sparkContext
weather_rdd = sc.textFile('rdu_weather_history.csv')
weather_rdd1 = weather_rdd.map(lambda l : l.split(';'))

In [41]:
weather_tuples= weather_rdd1.map(lambda l : (l[0],l[1],l[2]))

In [42]:
type(weather_tuples)

pyspark.rdd.PipelinedRDD

In [43]:
weather_tuples.collect()

[('date', 'temperaturemin', 'temperaturemax'),
 ('2007-01-06', '50.0', '71.1'),
 ('2007-01-09', '30.0', '55.0'),
 ('2007-01-14', '50.0', '73.9'),
 ('2007-01-15', '57.0', '73.9'),
 ('2007-01-20', '26.1', '48.0'),
 ('2007-01-22', '33.1', '41.0'),
 ('2007-01-24', '30.0', '48.9'),
 ('2007-01-27', '32.0', '64.0'),
 ('2007-01-30', '24.1', '48.9'),
 ('2007-02-08', '27.0', '48.0'),
 ('2007-02-12', '26.1', '64.0'),
 ('2007-02-13', '37.9', '50.0'),
 ('2007-02-17', '19.0', '48.0'),
 ('2007-02-21', '50.0', '73.0'),
 ('2007-02-23', '37.0', '55.0'),
 ('2007-02-28', '37.9', '61.0'),
 ('2007-03-01', '41.0', '66.9'),
 ('2007-03-11', '39.9', '70.0'),
 ('2007-03-15', '57.0', '81.0'),
 ('2007-03-17', '33.1', '48.9'),
 ('2007-03-21', '50.0', '63.0'),
 ('2007-03-23', '53.1', '82.9'),
 ('2007-03-26', '44.1', '73.9'),
 ('2007-04-03', '53.1', '86.0'),
 ('2007-04-05', '41.0', '57.9'),
 ('2007-04-08', '27.0', '55.9'),
 ('2007-04-11', '46.0', '59.0'),
 ('2007-04-15', '42.1', '77.0'),
 ('2007-04-17', '46.9', '72.0

In [44]:
fieldNames = ['date', 'temperaturemin', 'temperaturemax']

In [45]:
from pyspark.sql.types import (StringType,StructField,IntegerType,DateType,BooleanType,
                               DoubleType,DecimalType,StructType)

In [46]:
field_struct = [StructField(name=field_name,dataType=StringType(),nullable=True) for field_name in fieldNames]

In [47]:
schema = StructType(fields=field_struct)

In [48]:
weather_df2 =  spark_sess.createDataFrame(data=weather_tuples,schema = schema)

In [49]:
type(weather_df2)

pyspark.sql.dataframe.DataFrame

In [50]:
weather_df2.show(5)

+----------+--------------+--------------+
|      date|temperaturemin|temperaturemax|
+----------+--------------+--------------+
|      date|temperaturemin|temperaturemax|
|2007-01-06|          50.0|          71.1|
|2007-01-09|          30.0|          55.0|
|2007-01-14|          50.0|          73.9|
|2007-01-15|          57.0|          73.9|
+----------+--------------+--------------+
only showing top 5 rows



### Schema Inference

In [51]:
from pyspark.sql.types import (StringType,StructField,IntegerType,DateType,BooleanType,
                               DoubleType,DecimalType,StructType)

In [33]:
'''
data_schema =[StructField(name='date',dataType=DateType(),nullable=True),
              StructField(name='rain',dataType=BooleanType(),nullable=True),
              StructField(name='snow',dataType=BooleanType(),nullable=True),
              StructField(name='ice',dataType=BooleanType(),nullable=True),
              StructField(name='fog',dataType=BooleanType(),nullable=True),
              StructField(name='temperaturemax',dataType=DecimalType(),nullable=True),
              StructField(name='temperaturemin',dataType=DecimalType(),nullable=True)
             ]
'''

In [52]:
data_schema =[StructField(name='date',dataType=StringType(),nullable=True),
              StructField(name='rain',dataType=StringType(),nullable=True),
              StructField(name='snow',dataType=StringType(),nullable=True),
              StructField(name='ice',dataType=StringType(),nullable=True),
              StructField(name='fog',dataType=StringType(),nullable=True),
              StructField(name='temperaturemax',dataType=DecimalType(),nullable=True),
              StructField(name='temperaturemin',dataType=DecimalType(),nullable=True)
             ]

In [53]:
data_schema

[StructField(date,StringType,true),
 StructField(rain,StringType,true),
 StructField(snow,StringType,true),
 StructField(ice,StringType,true),
 StructField(fog,StringType,true),
 StructField(temperaturemax,DecimalType(10,0),true),
 StructField(temperaturemin,DecimalType(10,0),true)]

In [54]:
final_schema = StructType(fields=data_schema)

In [55]:
df2 = spark_sess.read.json('rdu_weather_history.json',schema=final_schema)

In [56]:
df2.show()

+----------+----+----+---+---+--------------+--------------+
|      date|rain|snow|ice|fog|temperaturemax|temperaturemin|
+----------+----+----+---+---+--------------+--------------+
|2007-01-06| Yes|  No| No|Yes|            71|            50|
|2007-01-09| Yes|  No| No| No|            55|            30|
|2007-01-14|  No|  No| No| No|            74|            50|
|2007-01-15|  No|  No| No| No|            74|            57|
|2007-01-20|  No|  No| No| No|            48|            26|
|2007-01-22| Yes|  No| No|Yes|            41|            33|
|2007-01-24|  No|  No| No|Yes|            49|            30|
|2007-01-27|  No|  No| No| No|            64|            32|
|2007-01-30|  No|  No| No| No|            49|            24|
|2007-02-08|  No|  No| No| No|            48|            27|
|2007-02-12|  No|  No| No| No|            64|            26|
|2007-02-13| Yes|  No| No|Yes|            50|            38|
|2007-02-17|  No|  No| No| No|            48|            19|
|2007-02-21| Yes|  No| N

In [57]:
df2.select('date').show(5)

+----------+
|      date|
+----------+
|2007-01-06|
|2007-01-09|
|2007-01-14|
|2007-01-15|
|2007-01-20|
+----------+
only showing top 5 rows



In [58]:
df2.select(['date','rain','snow','ice','fog']).show(5)

+----------+----+----+---+---+
|      date|rain|snow|ice|fog|
+----------+----+----+---+---+
|2007-01-06| Yes|  No| No|Yes|
|2007-01-09| Yes|  No| No| No|
|2007-01-14|  No|  No| No| No|
|2007-01-15|  No|  No| No| No|
|2007-01-20|  No|  No| No| No|
+----------+----+----+---+---+
only showing top 5 rows



In [59]:
df2.head(5)  #list of row objects

[Row(date='2007-01-06', rain='Yes', snow='No', ice='No', fog='Yes', temperaturemax=Decimal('71'), temperaturemin=Decimal('50')),
 Row(date='2007-01-09', rain='Yes', snow='No', ice='No', fog='No', temperaturemax=Decimal('55'), temperaturemin=Decimal('30')),
 Row(date='2007-01-14', rain='No', snow='No', ice='No', fog='No', temperaturemax=Decimal('74'), temperaturemin=Decimal('50')),
 Row(date='2007-01-15', rain='No', snow='No', ice='No', fog='No', temperaturemax=Decimal('74'), temperaturemin=Decimal('57')),
 Row(date='2007-01-20', rain='No', snow='No', ice='No', fog='No', temperaturemax=Decimal('48'), temperaturemin=Decimal('26'))]

In [60]:
df2.head(5)[0]

Row(date='2007-01-06', rain='Yes', snow='No', ice='No', fog='Yes', temperaturemax=Decimal('71'), temperaturemin=Decimal('50'))

In [61]:
type(df2.select('date'))

pyspark.sql.dataframe.DataFrame

In [62]:
type(df2['date'])

pyspark.sql.column.Column

In [63]:
df2['date']

Column<b'date'>

In [64]:
type(df2.head(5))

list

In [65]:
type(df2.head(5)[0])

pyspark.sql.types.Row

In [66]:
df2.head()

Row(date='2007-01-06', rain='Yes', snow='No', ice='No', fog='Yes', temperaturemax=Decimal('71'), temperaturemin=Decimal('50'))

In [67]:
type(df2.head()[2])

str

In [68]:
type(df2.head()[5])

decimal.Decimal

In [69]:
#Reading multipel columns
df2_tmp = df2.select(['date','rain','snow','ice','fog','temperaturemax','temperaturemin'])

In [70]:
df2_tmp

DataFrame[date: string, rain: string, snow: string, ice: string, fog: string, temperaturemax: decimal(10,0), temperaturemin: decimal(10,0)]

In [71]:
df2_tmp.show(5)

+----------+----+----+---+---+--------------+--------------+
|      date|rain|snow|ice|fog|temperaturemax|temperaturemin|
+----------+----+----+---+---+--------------+--------------+
|2007-01-06| Yes|  No| No|Yes|            71|            50|
|2007-01-09| Yes|  No| No| No|            55|            30|
|2007-01-14|  No|  No| No| No|            74|            50|
|2007-01-15|  No|  No| No| No|            74|            57|
|2007-01-20|  No|  No| No| No|            48|            26|
+----------+----+----+---+---+--------------+--------------+
only showing top 5 rows



In [72]:
#Add a new column
#Rename a Column

#TBD

#### Saving Dataframe in another type(ex parquet.orc)

In [73]:
#DataFrames loaded from any data source type can be converted into other types using this syntax.

In [74]:
df2.select(['date','rain','snow','ice','fog','temperaturemax','temperaturemin']).write.save('weather.parquet',format='parquet')

In [75]:
#Now read the data from the parquet format file created above

parquet_df = spark_sess.read.load('weather.parquet',format='parquet')

In [76]:
parquet_df.show(5)

+----------+----+----+---+---+--------------+--------------+
|      date|rain|snow|ice|fog|temperaturemax|temperaturemin|
+----------+----+----+---+---+--------------+--------------+
|2007-01-06| Yes|  No| No|Yes|            71|            50|
|2007-01-09| Yes|  No| No| No|            55|            30|
|2007-01-14|  No|  No| No| No|            74|            50|
|2007-01-15|  No|  No| No| No|            74|            57|
|2007-01-20|  No|  No| No| No|            48|            26|
+----------+----+----+---+---+--------------+--------------+
only showing top 5 rows



In [77]:
type(parquet_df)

pyspark.sql.dataframe.DataFrame

In [78]:
df2.select(['date','rain','snow','ice','fog','temperaturemax','temperaturemin']).write.save('weather.orc',format='orc')

In [79]:
orc_df = spark_sess.read.load(path='weather.orc',format='orc')

In [80]:
orc_df.show(5)

+----------+----+----+---+---+--------------+--------------+
|      date|rain|snow|ice|fog|temperaturemax|temperaturemin|
+----------+----+----+---+---+--------------+--------------+
|2007-01-06| Yes|  No| No|Yes|            71|            50|
|2007-01-09| Yes|  No| No| No|            55|            30|
|2007-01-14|  No|  No| No| No|            74|            50|
|2007-01-15|  No|  No| No| No|            74|            57|
|2007-01-20|  No|  No| No| No|            48|            26|
+----------+----+----+---+---+--------------+--------------+
only showing top 5 rows



#### Run SQL on files directly

In [81]:
parquet_df3 = spark_sess.sql('SELECT * FROM parquet.`weather.parquet`')  #NOTE THE EXTRA QUOTES ON THE FILE NAME

In [82]:
type(parquet_df3)

pyspark.sql.dataframe.DataFrame

In [83]:
parquet_df3.show(5)

+----------+----+----+---+---+--------------+--------------+
|      date|rain|snow|ice|fog|temperaturemax|temperaturemin|
+----------+----+----+---+---+--------------+--------------+
|2007-01-06| Yes|  No| No|Yes|            71|            50|
|2007-01-09| Yes|  No| No| No|            55|            30|
|2007-01-14|  No|  No| No| No|            74|            50|
|2007-01-15|  No|  No| No| No|            74|            57|
|2007-01-20|  No|  No| No| No|            48|            26|
+----------+----+----+---+---+--------------+--------------+
only showing top 5 rows



In [84]:
orc_df4 = spark_sess.sql('SELECT * FROM orc.`weather.orc`')

In [85]:
type(orc_df4)

pyspark.sql.dataframe.DataFrame

In [86]:
orc_df4.show(5)

+----------+----+----+---+---+--------------+--------------+
|      date|rain|snow|ice|fog|temperaturemax|temperaturemin|
+----------+----+----+---+---+--------------+--------------+
|2007-01-06| Yes|  No| No|Yes|            71|            50|
|2007-01-09| Yes|  No| No| No|            55|            30|
|2007-01-14|  No|  No| No| No|            74|            50|
|2007-01-15|  No|  No| No| No|            74|            57|
|2007-01-20|  No|  No| No| No|            48|            26|
+----------+----+----+---+---+--------------+--------------+
only showing top 5 rows



In [87]:
from pyspark.sql.functions import year
orc_df4.select(year(orc_df4['date'])).show(5)

+----------+
|year(date)|
+----------+
|      2007|
|      2007|
|      2007|
|      2007|
|      2007|
+----------+
only showing top 5 rows



In [89]:
orc_df4.count()

4870

### Saving Dataframes to Persistent Tables

In [91]:
#Store Dataframe as Table
orc_df4.write.option(key='path',value='G:/JupyterNotebook/weatherOrcTable').saveAsTable('weatherTableOrc')

##PLEASE MAKE SURE THAT THE DIRECTORY WHERE TABLE NEEDS TO BE CREATED SHOULD BE EMPTY ELSE IT WILL DELETE ALL ITEMS FROM THE FOLDER.


In [98]:
#Reading Data from the ORC Table created above
spark_sess.sql('SELECT * FROM weatherTableOrc').show(5)

+----------+----+----+---+---+--------------+--------------+
|      date|rain|snow|ice|fog|temperaturemax|temperaturemin|
+----------+----+----+---+---+--------------+--------------+
|2007-01-06| Yes|  No| No|Yes|            71|            50|
|2007-01-09| Yes|  No| No| No|            55|            30|
|2007-01-14|  No|  No| No| No|            74|            50|
|2007-01-15|  No|  No| No| No|            74|            57|
|2007-01-20|  No|  No| No| No|            48|            26|
+----------+----+----+---+---+--------------+--------------+
only showing top 5 rows



In [96]:
pwd

'G:\\JupyterNotebook'

### Hive Tables --tbd

In [99]:
from os.path import expanduser, join, abspath
from pyspark.sql import Row,SparkSession

In [100]:
# warehouse_location points to the default location for managed databases and tables
warehouse_location = abspath('spark-warehouse')

In [101]:
spark = SparkSession \
        .builder \
        .appName("Python Spark SQL Hive integration example") \
        .config("spark.sql.warehouse.dir", warehouse_location) \
        .enableHiveSupport() \
        .getOrCreate()

###### Create File

In [102]:
df2.select(['date','rain','snow','ice','fog','temperaturemax','temperaturemin']).show(5)

+----------+----+----+---+---+--------------+--------------+
|      date|rain|snow|ice|fog|temperaturemax|temperaturemin|
+----------+----+----+---+---+--------------+--------------+
|2007-01-06| Yes|  No| No|Yes|            71|            50|
|2007-01-09| Yes|  No| No| No|            55|            30|
|2007-01-14|  No|  No| No| No|            74|            50|
|2007-01-15|  No|  No| No| No|            74|            57|
|2007-01-20|  No|  No| No| No|            48|            26|
+----------+----+----+---+---+--------------+--------------+
only showing top 5 rows



In [103]:
df2.select(['date','rain','snow','ice','fog','temperaturemax','temperaturemin']).write.save('weather_subset.csv',format='csv')

In [None]:
# spark is an existing SparkSession
spark.sql("CREATE TABLE IF NOT EXISTS src (date DATE, rain STRING,snow STRING, ice STRING,fog STRING, maxtemp FLOAT,mintemp FLOAT) USING hive")

### Conversion to/from Pandas

Apache Arrow is an in-memory columnar data format that is used in Spark to efficiently transfer data between JVM and Python processes.

It is most beneficial to Python users that work with Pandas/NumPy data.

Its usage is not automatic and might require some minor changes to configuration or code to take full advantage and ensure compatibility.

The below steps give a high-level description of how to use Arrow in Spark and highlight any differences when working with Arrow-enabled data.

Arrow is available as an optimization when converting a Spark DataFrame to a Pandas DataFrame using the call toPandas() and when creating a Spark DataFrame from a Pandas DataFrame with createDataFrame(pandas_df).

To use Arrow when executing these calls, users need to first set the Spark configuration spark.sql.execution.arrow.enabled to true. This is disabled by default.

In addition, optimizations enabled by spark.sql.execution.arrow.enabled could fallback automatically to non-Arrow optimization implementation if an error occurs before the actual computation within Spark. This can be controlled by spark.sql.execution.arrow.fallback.enabled.


In [106]:
#install PyArrow package as :            pip install PyArrow

In [108]:
spark_sess.conf.get(key='spark.sql.execution.arrow.fallback.enabled')

'true'

In [109]:
spark_sess.conf.get(key='spark.sql.execution.arrow.enabled')

'false'

In [126]:
# Enable Arrow-based columnar data transfers

spark_sess.conf.set(key='spark.sql.execution.arrow.enabled',value = 'True')
spark_sess.conf.set(key='spark.sql.execution.arrow.fallback.enabled',value = 'True')

In [111]:
spark_sess.conf.get(key='spark.sql.execution.arrow.enabled')

'True'

In [112]:
import pandas as pd
import numpy as np

In [116]:
# Generate a Pandas DataFrame
panda_df = pd.DataFrame(np.random.rand(100, 3))

In [117]:
panda_df.head(5)

Unnamed: 0,0,1,2
0,0.646682,0.295599,0.188108
1,0.86758,0.025407,0.88277
2,0.215236,0.341696,0.02842
3,0.332491,0.136495,0.781741
4,0.808137,0.316163,0.096381


In [128]:
# Create a Spark DataFrame from a Pandas DataFrame using Arrow

spark_df = spark_sess.createDataFrame(data=panda_df)

  An error occurred while calling z:org.apache.spark.sql.api.python.PythonSQLUtils.readArrowStreamFromFile.
: java.lang.IllegalArgumentException
	at java.nio.ByteBuffer.allocate(ByteBuffer.java:334)
	at org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(MessageSerializer.java:543)
	at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$3.readNextBatch(ArrowConverters.scala:243)
	at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$3.<init>(ArrowConverters.scala:229)
	at org.apache.spark.sql.execution.arrow.ArrowConverters$.getBatchesFromStream(ArrowConverters.scala:228)
	at org.apache.spark.sql.execution.arrow.ArrowConverters$$anonfun$readArrowStreamFromFile$2.apply(ArrowConverters.scala:216)
	at org.apache.spark.sql.execution.arrow.ArrowConverters$$anonfun$readArrowStreamFromFile$2.apply(ArrowConverters.scala:214)
	at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2543)
	at org.apache.spark.sql.execution.arrow.ArrowConverters$.readArro

In [129]:
type(spark_df)

pyspark.sql.dataframe.DataFrame

In [130]:
spark_df.show(5)

+-------------------+--------------------+--------------------+
|                  0|                   1|                   2|
+-------------------+--------------------+--------------------+
| 0.6466815072461417| 0.29559880567387964|  0.1881078924128654|
| 0.8675804459797634|0.025406767711200318|  0.8827703588990432|
|0.21523618836107095|  0.3416955605871814|0.028420091935624114|
|0.33249072224381493| 0.13649470683698028|  0.7817411854792007|
| 0.8081371601327153|  0.3161631417044922| 0.09638121831188684|
+-------------------+--------------------+--------------------+
only showing top 5 rows



In [131]:
# Convert the Spark DataFrame back to a Pandas DataFrame using Arrow
result_panda_df = spark_df.select("*").toPandas()

In [132]:
result_panda_df.head(5)

Unnamed: 0,0,1,2
0,0.646682,0.295599,0.188108
1,0.86758,0.025407,0.88277
2,0.215236,0.341696,0.02842
3,0.332491,0.136495,0.781741
4,0.808137,0.316163,0.096381


In [None]:
result_panda_df.shape

In [None]:
#Another way to create Pandas Dataframe
result_panda_df = spark_df.toPandas()

### Pandas UDFs (a.k.a. Vectorized UDFs) ---tbd