## INTRO
- Basics OP on Pyspark DataFrame

In [11]:
# OP 
import datetime as dt   
import time
import csv
import requests
import pandas as pd, numpy as np

# SPARK 
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext, Row
from operator import add

In [12]:
# config 
conf = SparkConf().setAppName("LOAD PTT MYSQL DATABASE")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

## 1) Pyspark Read csv as Spark DataFrame

In [31]:
# load the data
df_boston = sqlContext.read\
                      .format('com.databricks.spark.csv')\
                      .options(header='true', inferschema='true')\
                      .load('boston.csv')

In [37]:
type(df_boston)

pyspark.sql.dataframe.DataFrame

In [36]:
df_boston

DataFrame[CRIM: double, ZN: double, INDUS: double, CHAS: double, NOX: double, RM: double, AGE: double, DIS: double, RAD: double, TAX: double, PTRATIO: double, B: double, LSTAT: double, price: double]

In [38]:
df_boston.printSchema()

root
 |-- CRIM: double (nullable = true)
 |-- ZN: double (nullable = true)
 |-- INDUS: double (nullable = true)
 |-- CHAS: double (nullable = true)
 |-- NOX: double (nullable = true)
 |-- RM: double (nullable = true)
 |-- AGE: double (nullable = true)
 |-- DIS: double (nullable = true)
 |-- RAD: double (nullable = true)
 |-- TAX: double (nullable = true)
 |-- PTRATIO: double (nullable = true)
 |-- B: double (nullable = true)
 |-- LSTAT: double (nullable = true)
 |-- price: double (nullable = true)



In [39]:
df_boston.columns

['CRIM',
 'ZN',
 'INDUS',
 'CHAS',
 'NOX',
 'RM',
 'AGE',
 'DIS',
 'RAD',
 'TAX',
 'PTRATIO',
 'B',
 'LSTAT',
 'price']

In [41]:
df_boston.describe().show()

+-------+------------------+------------------+------------------+------------------+-------------------+------------------+------------------+-----------------+-----------------+------------------+------------------+------------------+------------------+------------------+
|summary|              CRIM|                ZN|             INDUS|              CHAS|                NOX|                RM|               AGE|              DIS|              RAD|               TAX|           PTRATIO|                 B|             LSTAT|             price|
+-------+------------------+------------------+------------------+------------------+-------------------+------------------+------------------+-----------------+-----------------+------------------+------------------+------------------+------------------+------------------+
|  count|               506|               506|               506|               506|                506|               506|               506|              506|              

In [54]:
df_boston.show()

+-------+----+-----+----+-----+-----+-----+------+---+-----+-------+------+-----+-----+
|   CRIM|  ZN|INDUS|CHAS|  NOX|   RM|  AGE|   DIS|RAD|  TAX|PTRATIO|     B|LSTAT|price|
+-------+----+-----+----+-----+-----+-----+------+---+-----+-------+------+-----+-----+
|0.00632|18.0| 2.31| 0.0|0.538|6.575| 65.2|  4.09|1.0|296.0|   15.3| 396.9| 4.98| 24.0|
|0.02731| 0.0| 7.07| 0.0|0.469|6.421| 78.9|4.9671|2.0|242.0|   17.8| 396.9| 9.14| 21.6|
|0.02729| 0.0| 7.07| 0.0|0.469|7.185| 61.1|4.9671|2.0|242.0|   17.8|392.83| 4.03| 34.7|
|0.03237| 0.0| 2.18| 0.0|0.458|6.998| 45.8|6.0622|3.0|222.0|   18.7|394.63| 2.94| 33.4|
|0.06905| 0.0| 2.18| 0.0|0.458|7.147| 54.2|6.0622|3.0|222.0|   18.7| 396.9| 5.33| 36.2|
|0.02985| 0.0| 2.18| 0.0|0.458| 6.43| 58.7|6.0622|3.0|222.0|   18.7|394.12| 5.21| 28.7|
|0.08829|12.5| 7.87| 0.0|0.524|6.012| 66.6|5.5605|5.0|311.0|   15.2| 395.6|12.43| 22.9|
|0.14455|12.5| 7.87| 0.0|0.524|6.172| 96.1|5.9505|5.0|311.0|   15.2| 396.9|19.15| 27.1|
|0.21124|12.5| 7.87| 0.0|0.524|5

## 2) Manually set DataFrame schema

In [43]:
from pyspark.sql.types import (StructField, StructType,
                               IntegerType, StringType, LongType)

In [48]:
data_schema = [StructField('CRIM',StringType(), True )]

In [49]:
final_struc = StructType(fields= data_schema)

In [50]:
# reload the csv BUT WITH PRE-DEFINED SCHEMA AS ABOVE 
df_boston_updated = sqlContext.read\
                      .format('com.databricks.spark.csv')\
                      .options(header='true', inferschema='true', shema= final_struc)\
                      .load('boston.csv')

In [52]:
df_boston_updated.printSchema()

root
 |-- CRIM: double (nullable = true)
 |-- ZN: double (nullable = true)
 |-- INDUS: double (nullable = true)
 |-- CHAS: double (nullable = true)
 |-- NOX: double (nullable = true)
 |-- RM: double (nullable = true)
 |-- AGE: double (nullable = true)
 |-- DIS: double (nullable = true)
 |-- RAD: double (nullable = true)
 |-- TAX: double (nullable = true)
 |-- PTRATIO: double (nullable = true)
 |-- B: double (nullable = true)
 |-- LSTAT: double (nullable = true)
 |-- price: double (nullable = true)



## 3) Select one column from Spark DF 

In [63]:
# pandas way 
# which is not working here 
type(df_boston['CRIM'])
# df_boston['CRIM'].show() <--- not work

pyspark.sql.column.Column

In [62]:
# pyspark way 
type(df_boston.select('CRIM'))

pyspark.sql.dataframe.DataFrame

In [71]:
# SELECT 1 coluumn
df_boston.select('CRIM').show()

+-------+
|   CRIM|
+-------+
|0.00632|
|0.02731|
|0.02729|
|0.03237|
|0.06905|
|0.02985|
|0.08829|
|0.14455|
|0.21124|
|0.17004|
|0.22489|
|0.11747|
|0.09378|
|0.62976|
|0.63796|
|0.62739|
|1.05393|
| 0.7842|
|0.80271|
| 0.7258|
+-------+
only showing top 20 rows



In [70]:
# SELECT multiple coluumns
df_boston.select(['CRIM','B']).show()

+-------+------+
|   CRIM|     B|
+-------+------+
|0.00632| 396.9|
|0.02731| 396.9|
|0.02729|392.83|
|0.03237|394.63|
|0.06905| 396.9|
|0.02985|394.12|
|0.08829| 395.6|
|0.14455| 396.9|
|0.21124|386.63|
|0.17004|386.71|
|0.22489|392.52|
|0.11747| 396.9|
|0.09378| 390.5|
|0.62976| 396.9|
|0.63796|380.02|
|0.62739|395.62|
|1.05393|386.85|
| 0.7842|386.75|
|0.80271|288.99|
| 0.7258|390.95|
+-------+------+
only showing top 20 rows



In [68]:
df_boston.head(2)

[Row(CRIM=0.00632, ZN=18.0, INDUS=2.31, CHAS=0.0, NOX=0.538, RM=6.575, AGE=65.2, DIS=4.09, RAD=1.0, TAX=296.0, PTRATIO=15.3, B=396.9, LSTAT=4.98, price=24.0),
 Row(CRIM=0.02731, ZN=0.0, INDUS=7.07, CHAS=0.0, NOX=0.469, RM=6.421, AGE=78.9, DIS=4.9671, RAD=2.0, TAX=242.0, PTRATIO=17.8, B=396.9, LSTAT=9.14, price=21.6)]

In [69]:
df_boston.head(2)[0]

Row(CRIM=0.00632, ZN=18.0, INDUS=2.31, CHAS=0.0, NOX=0.538, RM=6.575, AGE=65.2, DIS=4.09, RAD=1.0, TAX=296.0, PTRATIO=15.3, B=396.9, LSTAT=4.98, price=24.0)

### 4) Create a  new column

In [84]:
# add new column : colX
df_boston.withColumn('colX', df_boston['B']).show()

+-------+----+-----+----+-----+-----+-----+------+---+-----+-------+------+-----+-----+------+
|   CRIM|  ZN|INDUS|CHAS|  NOX|   RM|  AGE|   DIS|RAD|  TAX|PTRATIO|     B|LSTAT|price|  colX|
+-------+----+-----+----+-----+-----+-----+------+---+-----+-------+------+-----+-----+------+
|0.00632|18.0| 2.31| 0.0|0.538|6.575| 65.2|  4.09|1.0|296.0|   15.3| 396.9| 4.98| 24.0| 396.9|
|0.02731| 0.0| 7.07| 0.0|0.469|6.421| 78.9|4.9671|2.0|242.0|   17.8| 396.9| 9.14| 21.6| 396.9|
|0.02729| 0.0| 7.07| 0.0|0.469|7.185| 61.1|4.9671|2.0|242.0|   17.8|392.83| 4.03| 34.7|392.83|
|0.03237| 0.0| 2.18| 0.0|0.458|6.998| 45.8|6.0622|3.0|222.0|   18.7|394.63| 2.94| 33.4|394.63|
|0.06905| 0.0| 2.18| 0.0|0.458|7.147| 54.2|6.0622|3.0|222.0|   18.7| 396.9| 5.33| 36.2| 396.9|
|0.02985| 0.0| 2.18| 0.0|0.458| 6.43| 58.7|6.0622|3.0|222.0|   18.7|394.12| 5.21| 28.7|394.12|
|0.08829|12.5| 7.87| 0.0|0.524|6.012| 66.6|5.5605|5.0|311.0|   15.2| 395.6|12.43| 22.9| 395.6|
|0.14455|12.5| 7.87| 0.0|0.524|6.172| 96.1|5.9505|

In [78]:
# add new column : colY
df_boston.withColumn('colY', df_boston['B']*2).show()

+-------+----+-----+----+-----+-----+-----+------+---+-----+-------+------+-----+-----+------+
|   CRIM|  ZN|INDUS|CHAS|  NOX|   RM|  AGE|   DIS|RAD|  TAX|PTRATIO|     B|LSTAT|price|  colY|
+-------+----+-----+----+-----+-----+-----+------+---+-----+-------+------+-----+-----+------+
|0.00632|18.0| 2.31| 0.0|0.538|6.575| 65.2|  4.09|1.0|296.0|   15.3| 396.9| 4.98| 24.0| 793.8|
|0.02731| 0.0| 7.07| 0.0|0.469|6.421| 78.9|4.9671|2.0|242.0|   17.8| 396.9| 9.14| 21.6| 793.8|
|0.02729| 0.0| 7.07| 0.0|0.469|7.185| 61.1|4.9671|2.0|242.0|   17.8|392.83| 4.03| 34.7|785.66|
|0.03237| 0.0| 2.18| 0.0|0.458|6.998| 45.8|6.0622|3.0|222.0|   18.7|394.63| 2.94| 33.4|789.26|
|0.06905| 0.0| 2.18| 0.0|0.458|7.147| 54.2|6.0622|3.0|222.0|   18.7| 396.9| 5.33| 36.2| 793.8|
|0.02985| 0.0| 2.18| 0.0|0.458| 6.43| 58.7|6.0622|3.0|222.0|   18.7|394.12| 5.21| 28.7|788.24|
|0.08829|12.5| 7.87| 0.0|0.524|6.012| 66.6|5.5605|5.0|311.0|   15.2| 395.6|12.43| 22.9| 791.2|
|0.14455|12.5| 7.87| 0.0|0.524|6.172| 96.1|5.9505|

## 5) Rename columns 

In [83]:
# rename column "B" -> "BBB"
df_boston.withColumnRenamed('B', 'BBB').show()

+-------+----+-----+----+-----+-----+-----+------+---+-----+-------+------+-----+-----+
|   CRIM|  ZN|INDUS|CHAS|  NOX|   RM|  AGE|   DIS|RAD|  TAX|PTRATIO|   BBB|LSTAT|price|
+-------+----+-----+----+-----+-----+-----+------+---+-----+-------+------+-----+-----+
|0.00632|18.0| 2.31| 0.0|0.538|6.575| 65.2|  4.09|1.0|296.0|   15.3| 396.9| 4.98| 24.0|
|0.02731| 0.0| 7.07| 0.0|0.469|6.421| 78.9|4.9671|2.0|242.0|   17.8| 396.9| 9.14| 21.6|
|0.02729| 0.0| 7.07| 0.0|0.469|7.185| 61.1|4.9671|2.0|242.0|   17.8|392.83| 4.03| 34.7|
|0.03237| 0.0| 2.18| 0.0|0.458|6.998| 45.8|6.0622|3.0|222.0|   18.7|394.63| 2.94| 33.4|
|0.06905| 0.0| 2.18| 0.0|0.458|7.147| 54.2|6.0622|3.0|222.0|   18.7| 396.9| 5.33| 36.2|
|0.02985| 0.0| 2.18| 0.0|0.458| 6.43| 58.7|6.0622|3.0|222.0|   18.7|394.12| 5.21| 28.7|
|0.08829|12.5| 7.87| 0.0|0.524|6.012| 66.6|5.5605|5.0|311.0|   15.2| 395.6|12.43| 22.9|
|0.14455|12.5| 7.87| 0.0|0.524|6.172| 96.1|5.9505|5.0|311.0|   15.2| 396.9|19.15| 27.1|
|0.21124|12.5| 7.87| 0.0|0.524|5

## 6) Pyspark SQL

In [90]:
# create a temp SQL view from df 
df_boston.createOrReplaceTempView('BOSTON')

In [91]:
result=sqlContext.sql("select B from BOSTON LIMIT 10").show()

+------+
|     B|
+------+
| 396.9|
| 396.9|
|392.83|
|394.63|
| 396.9|
|394.12|
| 395.6|
| 396.9|
|386.63|
|386.71|
+------+



## 7) Filter  data

In [96]:
result2=df_boston.filter("B < 50").show()

+-------+---+-----+----+-----+-----+-----+------+----+-----+-------+-----+-----+-----+
|   CRIM| ZN|INDUS|CHAS|  NOX|   RM|  AGE|   DIS| RAD|  TAX|PTRATIO|    B|LSTAT|price|
+-------+---+-----+----+-----+-----+-----+------+----+-----+-------+-----+-----+-----+
|51.1358|0.0| 18.1| 0.0|0.597|5.757|100.0| 1.413|24.0|666.0|   20.2|  2.6|10.11| 15.0|
|14.0507|0.0| 18.1| 0.0|0.597|6.657|100.0|1.5275|24.0|666.0|   20.2|35.05|21.22| 17.2|
| 18.811|0.0| 18.1| 0.0|0.597|4.628|100.0|1.5539|24.0|666.0|   20.2|28.79|34.37| 17.9|
|18.0846|0.0| 18.1| 0.0|0.679|6.434|100.0|1.8347|24.0|666.0|   20.2|27.25|29.05|  7.2|
|10.8342|0.0| 18.1| 0.0|0.679|6.782| 90.8|1.8195|24.0|666.0|   20.2|21.57|25.79|  7.5|
|73.5341|0.0| 18.1| 0.0|0.679|5.957|100.0|1.8026|24.0|666.0|   20.2|16.45|20.62|  8.8|
|11.8123|0.0| 18.1| 0.0|0.718|6.824| 76.5| 1.794|24.0|666.0|   20.2|48.45|22.74|  8.4|
|7.05042|0.0| 18.1| 0.0|0.614|6.103| 85.1|2.0218|24.0|666.0|   20.2| 2.52|23.29| 13.4|
|8.79212|0.0| 18.1| 0.0|0.584|5.565| 70.6|2

In [97]:
result2=df_boston.filter("B < 50").select(['B']).show()

+-----+
|    B|
+-----+
|  2.6|
|35.05|
|28.79|
|27.25|
|21.57|
|16.45|
|48.45|
| 2.52|
| 3.65|
| 7.68|
|24.65|
|18.82|
|27.49|
| 9.32|
|43.06|
| 0.32|
| 6.68|
|10.48|
|  3.5|
|22.01|
+-----+



In [98]:
# python way 
result2=df_boston.filter(df_boston['B']<50).show()

+-------+---+-----+----+-----+-----+-----+------+----+-----+-------+-----+-----+-----+
|   CRIM| ZN|INDUS|CHAS|  NOX|   RM|  AGE|   DIS| RAD|  TAX|PTRATIO|    B|LSTAT|price|
+-------+---+-----+----+-----+-----+-----+------+----+-----+-------+-----+-----+-----+
|51.1358|0.0| 18.1| 0.0|0.597|5.757|100.0| 1.413|24.0|666.0|   20.2|  2.6|10.11| 15.0|
|14.0507|0.0| 18.1| 0.0|0.597|6.657|100.0|1.5275|24.0|666.0|   20.2|35.05|21.22| 17.2|
| 18.811|0.0| 18.1| 0.0|0.597|4.628|100.0|1.5539|24.0|666.0|   20.2|28.79|34.37| 17.9|
|18.0846|0.0| 18.1| 0.0|0.679|6.434|100.0|1.8347|24.0|666.0|   20.2|27.25|29.05|  7.2|
|10.8342|0.0| 18.1| 0.0|0.679|6.782| 90.8|1.8195|24.0|666.0|   20.2|21.57|25.79|  7.5|
|73.5341|0.0| 18.1| 0.0|0.679|5.957|100.0|1.8026|24.0|666.0|   20.2|16.45|20.62|  8.8|
|11.8123|0.0| 18.1| 0.0|0.718|6.824| 76.5| 1.794|24.0|666.0|   20.2|48.45|22.74|  8.4|
|7.05042|0.0| 18.1| 0.0|0.614|6.103| 85.1|2.0218|24.0|666.0|   20.2| 2.52|23.29| 13.4|
|8.79212|0.0| 18.1| 0.0|0.584|5.565| 70.6|2

In [100]:
#####  filter in multiple conditions #####

df_boston.filter((df_boston['B'] < 50 )  & (df_boston['B'] > 10 )  ).show()

+-------+---+-----+----+-----+-----+-----+------+----+-----+-------+-----+-----+-----+
|   CRIM| ZN|INDUS|CHAS|  NOX|   RM|  AGE|   DIS| RAD|  TAX|PTRATIO|    B|LSTAT|price|
+-------+---+-----+----+-----+-----+-----+------+----+-----+-------+-----+-----+-----+
|14.0507|0.0| 18.1| 0.0|0.597|6.657|100.0|1.5275|24.0|666.0|   20.2|35.05|21.22| 17.2|
| 18.811|0.0| 18.1| 0.0|0.597|4.628|100.0|1.5539|24.0|666.0|   20.2|28.79|34.37| 17.9|
|18.0846|0.0| 18.1| 0.0|0.679|6.434|100.0|1.8347|24.0|666.0|   20.2|27.25|29.05|  7.2|
|10.8342|0.0| 18.1| 0.0|0.679|6.782| 90.8|1.8195|24.0|666.0|   20.2|21.57|25.79|  7.5|
|73.5341|0.0| 18.1| 0.0|0.679|5.957|100.0|1.8026|24.0|666.0|   20.2|16.45|20.62|  8.8|
|11.8123|0.0| 18.1| 0.0|0.718|6.824| 76.5| 1.794|24.0|666.0|   20.2|48.45|22.74|  8.4|
|12.2472|0.0| 18.1| 0.0|0.584|5.837| 59.7|1.9976|24.0|666.0|   20.2|24.65|15.69| 10.2|
|37.6619|0.0| 18.1| 0.0|0.679|6.202| 78.7|1.8629|24.0|666.0|   20.2|18.82|14.52| 10.9|
|14.4208|0.0| 18.1| 0.0| 0.74|6.461| 93.3|2

## 8) Get filter spark data as dict, array.. and process on them

In [101]:
filer_result=df_boston.filter((df_boston['B'] < 50 )  & (df_boston['B'] > 10 )  ).collect()

In [102]:
filer_result

[Row(CRIM=14.0507, ZN=0.0, INDUS=18.1, CHAS=0.0, NOX=0.597, RM=6.657, AGE=100.0, DIS=1.5275, RAD=24.0, TAX=666.0, PTRATIO=20.2, B=35.05, LSTAT=21.22, price=17.2),
 Row(CRIM=18.811, ZN=0.0, INDUS=18.1, CHAS=0.0, NOX=0.597, RM=4.628, AGE=100.0, DIS=1.5539, RAD=24.0, TAX=666.0, PTRATIO=20.2, B=28.79, LSTAT=34.37, price=17.9),
 Row(CRIM=18.0846, ZN=0.0, INDUS=18.1, CHAS=0.0, NOX=0.679, RM=6.434, AGE=100.0, DIS=1.8347, RAD=24.0, TAX=666.0, PTRATIO=20.2, B=27.25, LSTAT=29.05, price=7.2),
 Row(CRIM=10.8342, ZN=0.0, INDUS=18.1, CHAS=0.0, NOX=0.679, RM=6.782, AGE=90.8, DIS=1.8195, RAD=24.0, TAX=666.0, PTRATIO=20.2, B=21.57, LSTAT=25.79, price=7.5),
 Row(CRIM=73.5341, ZN=0.0, INDUS=18.1, CHAS=0.0, NOX=0.679, RM=5.957, AGE=100.0, DIS=1.8026, RAD=24.0, TAX=666.0, PTRATIO=20.2, B=16.45, LSTAT=20.62, price=8.8),
 Row(CRIM=11.8123, ZN=0.0, INDUS=18.1, CHAS=0.0, NOX=0.718, RM=6.824, AGE=76.5, DIS=1.794, RAD=24.0, TAX=666.0, PTRATIO=20.2, B=48.45, LSTAT=22.74, price=8.4),
 Row(CRIM=12.2472, ZN=0.0, IND

In [103]:
type(filer_result)

list

In [104]:
filer_result[0]

Row(CRIM=14.0507, ZN=0.0, INDUS=18.1, CHAS=0.0, NOX=0.597, RM=6.657, AGE=100.0, DIS=1.5275, RAD=24.0, TAX=666.0, PTRATIO=20.2, B=35.05, LSTAT=21.22, price=17.2)

In [107]:
filer_result[0].asDict()

{'AGE': 100.0,
 'B': 35.05,
 'CHAS': 0.0,
 'CRIM': 14.0507,
 'DIS': 1.5275,
 'INDUS': 18.1,
 'LSTAT': 21.22,
 'NOX': 0.597,
 'PTRATIO': 20.2,
 'RAD': 24.0,
 'RM': 6.657,
 'TAX': 666.0,
 'ZN': 0.0,
 'price': 17.2}

In [108]:
filer_result[0].asDict()['AGE']

100.0

In [109]:
# END OF COURSE 8.26 
# next 8.27