#Spark - Dataframe Introduction
Comparing Panda's dataframe functionality and syntax to Spark's dataframe

In [1]:
from pyspark.sql import Row
from pyspark.sql import functions as Func
import pandas as pd
import numpy as np

###Creating a dataframe

In [2]:
p_weather = pd.read_csv('inmar/weather/2007_small.csv', names = ['station', 'date', 'element', 'value', 'measurement', 'quality', 'source', 'hour'])

In [3]:
rows = sc.textFile('inmar/weather/2007_small.csv')
parts = rows.map(lambda l: l.split(","))

# Set the schema for the data
schema_weather = parts.map(lambda p: Row(station=str(p[0]), date=str(p[1]), element=str(p[2]), value=float(p[3]), measurement=str(p[4]), quality=str(p[5]), source= str(p[5]), hour = str(p[7])))

# Infer the schema, and register the DataFrame as a table.
s_weather = sqlContext.createDataFrame(schema_weather)
s_weather.registerTempTable("weather")

In [4]:
s_weather.printSchema()

root
 |-- date: string (nullable = true)
 |-- element: string (nullable = true)
 |-- hour: string (nullable = true)
 |-- measurement: string (nullable = true)
 |-- quality: string (nullable = true)
 |-- source: string (nullable = true)
 |-- station: string (nullable = true)
 |-- value: double (nullable = true)



###Show the head rows

In [5]:
p_weather.head()

Unnamed: 0,station,date,element,value,measurement,quality,source,hour
0,CA002303986,20070101,TMAX,-130,,,G,
1,CA002303986,20070101,TMIN,-220,,,G,
2,CA002303986,20070101,PRCP,0,T,,G,
3,ASN00037003,20070101,PRCP,0,,,a,
4,NOE00133566,20070101,TMAX,66,,,E,


In [6]:
s_weather.head(3)

[Row(date=u'20070101', element=u'TMAX', hour=u'', measurement=u'', quality=u'', source=u'', station=u'CA002303986', value=-130.0),
 Row(date=u'20070101', element=u'TMIN', hour=u'', measurement=u'', quality=u'', source=u'', station=u'CA002303986', value=-220.0),
 Row(date=u'20070101', element=u'PRCP', hour=u'', measurement=u'T', quality=u'', source=u'', station=u'CA002303986', value=0.0)]

###Show the shape of the data

In [7]:
p_weather.shape

(49999, 8)

In [8]:
s_weather.count()

49999

###Unique values in a column

In [9]:
p_weather['quality'].unique()

array([nan, 'I', 'D', 'S', 'L', 'G', 'O', 'X', 'N'], dtype=object)

In [10]:
s_weather[['quality']].distinct().collect()

[Row(quality=u'D'),
 Row(quality=u'G'),
 Row(quality=u'I'),
 Row(quality=u'L'),
 Row(quality=u'N'),
 Row(quality=u'O'),
 Row(quality=u'S'),
 Row(quality=u'X'),
 Row(quality=u'')]

###Describe - statistics on a column

In [11]:
p_weather['value'].describe()

count    49999.000000
mean        30.459649
std        116.482134
min       -994.000000
25%          0.000000
50%          0.000000
75%         67.000000
max       2404.000000
Name: value, dtype: float64

In [12]:
s_weather[['value']].describe().show()

+-------+------------------+
|summary|             value|
+-------+------------------+
|  count|             49999|
|   mean| 30.45964919298386|
| stddev|116.48096927468023|
|    min|            -994.0|
|    max|            2404.0|
+-------+------------------+



###Filtering

In [13]:
p_one_station_tmax = p_weather[(p_weather['element'] == 'TMAX') & (p_weather['station'] == 'USC00114078')]
p_one_station_tmax

Unnamed: 0,station,date,element,value,measurement,quality,source,hour
3725,USC00114078,20070101,TMAX,128,,,0,1800
49569,USC00114078,20070102,TMAX,83,,,0,1800


In [14]:
w_one_station_tmax = s_weather[(s_weather['element'] == 'TMAX') & (s_weather['station'] == 'USC00114078')]
w_one_station_tmax.collect()

[Row(date=u'20070101', element=u'TMAX', hour=u'1800.0', measurement=u'', quality=u'', source=u'', station=u'USC00114078', value=128.0),
 Row(date=u'20070102', element=u'TMAX', hour=u'1800.0', measurement=u'', quality=u'', source=u'', station=u'USC00114078', value=83.0)]

###Simple metric from a column

In [15]:
p_one_station_tmax['value'].mean()

105.5

In [16]:
w_one_station_tmax.groupby('element').agg(Func.mean('value')).collect()

[Row(element=u'TMAX', avg(value)=105.5)]

###aggregating - groupby

In [17]:
p_weather[p_weather['element'] == 'TMIN'].groupby('date').aggregate([max, min, np.mean])['value']

Unnamed: 0_level_0,max,min,mean
date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
20070101,600,-994,-28.724394
20070102,282,-509,-23.213836


In [18]:
s_weather[s_weather['element'] == 'TMIN'].groupby('date').agg(Func.max('value'),Func.min('value'), Func.mean('value')).show()

+--------+----------+----------+------------------+
|    date|max(value)|min(value)|        avg(value)|
+--------+----------+----------+------------------+
|20070101|     600.0|    -994.0|-28.72439402595283|
|20070102|     282.0|    -509.0|-23.21383647798742|
+--------+----------+----------+------------------+



###adding a column

In [19]:
p_weather['reset_value'] = 0.1

In [20]:
s_weather = s_weather.withColumn('reset_value', Func.lit(0.1))

###multiplying one column by the other

In [21]:
# a few operations that you can do in Pandas don’t translate to Spark well. Please remember that DataFrames in Spark are like RDD in the sense that they’re an immutable data structure. Therefore things like:

(p_weather['value'] * p_weather['reset_value']).head(3)

0   -13
1   -22
2     0
dtype: float64

In [22]:
s_weather.withColumn('r.value * Stan
 
Many thanks for taking the time to meet today over lunch and thank you for paying.  I am glad I know you.  It is good to have solid software talent in town.
 
I look forward to seeing you at the next Data Science meet up.  If I don’t see you before the new year, I will drop you a line in January to catch up.s_weather.reset_value).show(3)
# s_weather.withColumn('multiplier', s_weather.value * s_weather.date).show(3)

+--------+-------+----+-----------+-------+------+-----------+------+-----------+----------+
|    date|element|hour|measurement|quality|source|    station| value|reset_value|multiplier|
+--------+-------+----+-----------+-------+------+-----------+------+-----------+----------+
|20070101|   TMAX|    |           |       |      |CA002303986|-130.0|        0.1|     -13.0|
|20070101|   TMIN|    |           |       |      |CA002303986|-220.0|        0.1|     -22.0|
|20070101|   PRCP|    |          T|       |      |CA002303986|   0.0|        0.1|       0.0|
+--------+-------+----+-----------+-------+------+-----------+------+-----------+----------+
only showing top 3 rows



###multiplying a column by a value

In [23]:
(p_weather['value'] * 0.1).head(3)

0   -13
1   -22
2     0
Name: value, dtype: float64

In [24]:
s_weather.withColumn('temp', s_weather['value'] * 0.1).show(3)

+--------+-------+----+-----------+-------+------+-----------+------+-----------+-----+
|    date|element|hour|measurement|quality|source|    station| value|reset_value| temp|
+--------+-------+----+-----------+-------+------+-----------+------+-----------+-----+
|20070101|   TMAX|    |           |       |      |CA002303986|-130.0|        0.1|-13.0|
|20070101|   TMIN|    |           |       |      |CA002303986|-220.0|        0.1|-22.0|
|20070101|   PRCP|    |          T|       |      |CA002303986|   0.0|        0.1|  0.0|
+--------+-------+----+-----------+-------+------+-----------+------+-----------+-----+
only showing top 3 rows



###Convert from spark dataframe to a pandas and vice versa

In [25]:
type(sqlContext.createDataFrame(p_weather))

pyspark.sql.dataframe.DataFrame

In [26]:
type(s_weather.toPandas())

pandas.core.frame.DataFrame