# Elementry Syntax: RDD -> sparkDF -> pandasDF 

In [1]:
from pyspark.sql import SparkSession
import numpy as np
import pandas as pd
import random
from IPython.display import display

## Create a dummy dataset        



In [2]:
ROWS = 1000
COLUMNS = 5


data = []
for i in range(ROWS*COLUMNS):
    sample = random.random()
    data.append(sample)

    
data = np.array(data)

data = data.reshape((ROWS,COLUMNS))
pandasDF = pd.DataFrame(data)




column_names = []
for i in range(COLUMNS):
    name = "column_%s" % i
    column_names.append(name)

pandasDF.columns = column_names

print "data as a pandasDF"
display(pandasDF.head())

data as a pandasDF


Unnamed: 0,column_0,column_1,column_2,column_3,column_4
0,0.250237,0.183555,0.215014,0.782768,0.695688
1,0.650195,0.16661,0.933383,0.089396,0.141637
2,0.238586,0.977994,0.700689,0.227258,0.8291
3,0.148277,0.809591,0.955313,0.304549,0.329506
4,0.797299,0.49757,0.636135,0.372165,0.207162


## Spin up a Spark Session and create a sparkDF

In [3]:
### Spark ###
spark = SparkSession\
        .builder\
        .appName("app_spark")\
        .getOrCreate()

# Create a sparkDF from the pandasDF
sparkDF = spark.createDataFrame(pandasDF)

# .first() method returns the first row of a sparkDF
print "\nprint first row of sparkDF:"
first = sparkDF.first()
print first

# .head(n) method returns the first n rows of a sparkDF
print "\nyou can also do the .head() of a sparkDF:"
print sparkDF.head(5)

print "\nyou can also get a prettier view of a sparkDF with .show()"
# .show() does not require a print statement for display
sparkDF.show()

print "done"


print first row of sparkDF:
Row(column_0=0.2502368192953367, column_1=0.18355460368749033, column_2=0.21501365160371155, column_3=0.7827677590075888, column_4=0.6956882060822042)

you can also do the .head() of a sparkDF:
[Row(column_0=0.2502368192953367, column_1=0.18355460368749033, column_2=0.21501365160371155, column_3=0.7827677590075888, column_4=0.6956882060822042), Row(column_0=0.6501949358841297, column_1=0.1666102817563173, column_2=0.9333827070652461, column_3=0.08939629392444737, column_4=0.1416373280034805), Row(column_0=0.23858583884070117, column_1=0.9779938973558728, column_2=0.7006892024313864, column_3=0.22725765489006255, column_4=0.8291002106884638), Row(column_0=0.14827669390957376, column_1=0.809591003707794, column_2=0.9553127036225423, column_3=0.30454916266712373, column_4=0.32950633262575335), Row(column_0=0.7972988119377827, column_1=0.4975702867006666, column_2=0.6361347158481788, column_3=0.3721650352152236, column_4=0.2071617655452449)]

you can also get a

# Convert back and forth to RDDs and pandasDFs


In [4]:
# Make an RDD form a sparkDF 
data_RDD = sparkDF.rdd

# pull the schema from the sparkDF for later
schema = sparkDF.schema
print "The original schema:\n",schema

print "\ndata_RDD before:\n", data_RDD.take(5)

# Now we can work with the data in with mapper and reducer paradigm
def mapper_function(row):
    out_row = []
    for i in range(len(row)):
        out_row.append(row[i]*i)
    return out_row

new_RDD = data_RDD.map(lambda row: mapper_function(row))

print "\ndata_RDD after:\n", new_RDD.take(10)

# Convert the new RDD back to sparkDF 
# Make sure to load the 'schema=' argument
new_sparkDF = new_RDD.toDF(schema=schema)

# View the new sparkDF
new_sparkDF.first()
new_sparkDF.show(5)

# convert the sparkDF to a pandasDF 
new_pandasDF = new_sparkDF.toPandas()

display(new_pandasDF.head())

The original schema:
StructType(List(StructField(column_0,DoubleType,true),StructField(column_1,DoubleType,true),StructField(column_2,DoubleType,true),StructField(column_3,DoubleType,true),StructField(column_4,DoubleType,true)))

data_RDD before:
[Row(column_0=0.2502368192953367, column_1=0.18355460368749033, column_2=0.21501365160371155, column_3=0.7827677590075888, column_4=0.6956882060822042), Row(column_0=0.6501949358841297, column_1=0.1666102817563173, column_2=0.9333827070652461, column_3=0.08939629392444737, column_4=0.1416373280034805), Row(column_0=0.23858583884070117, column_1=0.9779938973558728, column_2=0.7006892024313864, column_3=0.22725765489006255, column_4=0.8291002106884638), Row(column_0=0.14827669390957376, column_1=0.809591003707794, column_2=0.9553127036225423, column_3=0.30454916266712373, column_4=0.32950633262575335), Row(column_0=0.7972988119377827, column_1=0.4975702867006666, column_2=0.6361347158481788, column_3=0.3721650352152236, column_4=0.20716176554524

Unnamed: 0,column_0,column_1,column_2,column_3,column_4
0,0.0,0.183555,0.430027,2.348303,2.782753
1,0.0,0.16661,1.866765,0.268189,0.566549
2,0.0,0.977994,1.401378,0.681773,3.316401
3,0.0,0.809591,1.910625,0.913647,1.318025
4,0.0,0.49757,1.272269,1.116495,0.828647


## You can use Pandas like syntax to filter sparkDFs

In [5]:
filteredDF = sparkDF[sparkDF["column_0"]>= .99]
filteredDF.show(100)

+------------------+--------------------+-------------------+--------------------+--------------------+
|          column_0|            column_1|           column_2|            column_3|            column_4|
+------------------+--------------------+-------------------+--------------------+--------------------+
|0.9993951595211199|0.029367447254841195| 0.6685370712082788|  0.6236610054053627| 0.13127968510727994|
|0.9997610154240947|  0.6854425950558367| 0.6559377564159153|  0.9308906186030844|  0.6896996549583291|
| 0.997127359988409|  0.9229308812309674| 0.1146010965843266| 0.18753752035377214| 0.18385179543655217|
|0.9998557721426741|  0.4035282104235407|  0.486204908051453|  0.3896757218590855|  0.4724727281286535|
|0.9922742665307508|  0.2811508406364178| 0.8241255306763826|  0.0588272205327377| 0.27403606754227827|
|0.9914236271467993|  0.8820785921937543| 0.7244537217491043|  0.4583781816898216| 0.22388193649896304|
|0.9929329614058017|  0.7312139862363859|0.21805864514244444|0.0

* Note: the `.select()` method is used for grabbing a single row
* This is **different** form pandas syntax

In [6]:
sparkDF.select("column_0").show(10)

+-------------------+
|           column_0|
+-------------------+
| 0.2502368192953367|
| 0.6501949358841297|
|0.23858583884070117|
|0.14827669390957376|
| 0.7972988119377827|
| 0.4518485795507049|
| 0.5702922917664767|
|0.35163435406084087|
| 0.5404611264884952|
|  0.659618623388003|
+-------------------+
only showing top 10 rows



## Get summary statistics similar to pandas with the `describe()` method

In [7]:
description = sparkDF.describe()
display(description.toPandas())

means = description[description["summary"]=="mean"].collect()

means = list(np.array(means)[0][1:])
print "means", means

stddevs = description[description["summary"]=="stddev"].collect()

stddevs = list(np.array(stddevs)[0][1:])
print "stddevs", stddevs



Unnamed: 0,summary,column_0,column_1,column_2,column_3,column_4
0,count,1000.0,1000.0,1000.0,1000.0,1000.0
1,mean,0.503540568335868,0.4910500448849662,0.5046307212741203,0.5018073505560066,0.4989487306760322
2,stddev,0.2925918332707907,0.2896823828947518,0.2913106882049708,0.2847334662311207,0.291801714052591
3,min,0.00013340337608869213,0.000378184126901937,0.0008000008504710499,0.0007376032718799941,0.0004699503681363515
4,max,0.9999048079458932,0.9991836291434502,0.9990497752859387,0.9983956844507894,0.9998578469548808


means [u'0.503540568335868', u'0.49105004488496623', u'0.5046307212741203', u'0.5018073505560066', u'0.49894873067603224']
stddevs [u'0.2925918332707907', u'0.2896823828947518', u'0.2913106882049708', u'0.2847334662311207', u'0.291801714052591']


You can cash sparkDFs as with RDDs


In [8]:
# Check if sparkDF is Cashed
print  "What is the BEFORE cashed status of the original sparkDF?", sparkDF.is_cached

# Duplicate sparkDF as cashedDF using SQL style commands
sparkDF.createOrReplaceTempView("cashedDF")
cashedDF = spark.sql("select * from cashedDF")
print "New cashedDF object is of type:", type(cashedDF)

print "What is the BEFORE cashed status of cashedDF?", cashedDF.is_cached

# use the .cashe method to cash the new DF
cashedDF.cache()

print "What is the AFTER cashed status of cashedDF?", cashedDF.is_cached


print "What is the AFTER cashed status of the original sparkDF?", sparkDF.is_cached

What is the BEFORE cashed status of the original sparkDF? False
New cashedDF object is of type: <class 'pyspark.sql.dataframe.DataFrame'>
What is the BEFORE cashed status of cashedDF? False
What is the AFTER cashed status of cashedDF? True
What is the AFTER cashed status of the original sparkDF? False


## Note: using the `newDF = sparkDF` will **NOT** make a new DF object called newDF:

In [9]:
# Check if sparkDF is Cashed
print  "What is the BEFORE cashed status of the original sparkDF?", sparkDF.is_cached

newDF = sparkDF

print "What is the BEFORE cashed status of newDF?", newDF.is_cached

# use the .cashe method to cash the new DF
newDF.cache()

print "What is the AFTER cashed status of newDF?", newDF.is_cached

# Cashing newDF cashes the original sparkDF since newDF is just a pointer asignment
print "What is the AFTER cashed status of the original sparkDF?", sparkDF.is_cached

What is the BEFORE cashed status of the original sparkDF? False
What is the BEFORE cashed status of newDF? False
What is the AFTER cashed status of newDF? True
What is the AFTER cashed status of the original sparkDF? True


## Manipulating sparkDFs using `.withColumn()`


In [10]:
# Create new sparkDF from orignal DF
sparkDF.createOrReplaceTempView("sparkDF_standardized")
sparkDF_standardized = spark.sql("select * from sparkDF_standardized")



#sparkDF_standardized = sparkDF
for i in range(COLUMNS):
    new_name = "column_%s_standardized" % i
    old_name = "column_%s" % i
    sparkDF_standardized = sparkDF_standardized.withColumn(new_name, (sparkDF_standardized[old_name] - means[i])/float(stddevs[i]))
    sparkDF_standardized = sparkDF_standardized.drop(old_name)

sparkDF_standardized.show(5)

+---------------------+---------------------+---------------------+---------------------+---------------------+
|column_0_standardized|column_1_standardized|column_2_standardized|column_3_standardized|column_4_standardized|
+---------------------+---------------------+---------------------+---------------------+---------------------+
|  -0.8657239206192064|   -1.061491686599846|  -0.9941862121674047|   0.9867488081771296|   0.6742231657023032|
|   0.5012250885773345|  -1.1199844460208246|   1.4718031406022634|  -1.4484109019238935|  -1.2245006984696918|
|  -0.9055438305749445|   1.6809577703847016|   0.6730219284618099|  -0.9642340231379966|    1.131424059943891|
|  -1.2141961395663667|   1.0996214393137191|   1.5470835798213822|  -0.6927818865129476|  -0.5806765001370179|
|   1.0039864760334385|  0.02250824420366545|   0.4514217977526352|  -0.4553111267770895|  -0.9999494556697156|
+---------------------+---------------------+---------------------+---------------------+---------------

## This can also be done with a UDF

In [11]:
def standardize(x, mean, std):
    return (x - mean)/float(std)


sparkDF.createOrReplaceTempView("sparkDF_standardized2")
sparkDF_standardized2 = spark.sql("select * from sparkDF_standardized2")

for i in range(COLUMNS):
    new_name = "column_%s_standardized2" % i
    old_name = "column_%s" % i
    sparkDF_standardized2 = sparkDF_standardized2.withColumn(new_name, standardize(sparkDF_standardized2[old_name], means[i], stddevs[i]))
    sparkDF_standardized2 = sparkDF_standardized2.drop(old_name)

sparkDF_standardized2.show(5)

+----------------------+----------------------+----------------------+----------------------+----------------------+
|column_0_standardized2|column_1_standardized2|column_2_standardized2|column_3_standardized2|column_4_standardized2|
+----------------------+----------------------+----------------------+----------------------+----------------------+
|   -0.8657239206192064|    -1.061491686599846|   -0.9941862121674047|    0.9867488081771296|    0.6742231657023032|
|    0.5012250885773345|   -1.1199844460208246|    1.4718031406022634|   -1.4484109019238935|   -1.2245006984696918|
|   -0.9055438305749445|    1.6809577703847016|    0.6730219284618099|   -0.9642340231379966|     1.131424059943891|
|   -1.2141961395663667|    1.0996214393137191|    1.5470835798213822|   -0.6927818865129476|   -0.5806765001370179|
|    1.0039864760334385|   0.02250824420366545|    0.4514217977526352|   -0.4553111267770895|   -0.9999494556697156|
+----------------------+----------------------+-----------------