## Spark Tutorial 2 - Spark DataFrame Basic Operations

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('ops').getOrCreate()

## When you are using a CSV file as your data source you can infer the Schema

In [3]:
source_file = '/home/robert/Downloads/Python-and-Spark-for-Big-Data-master/Spark_DataFrames/appl_stock.csv'

In [5]:
df = spark.read.csv(source_file, inferSchema=True, header=True)

In [7]:
df.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [10]:
df.head(3)[0]

Row(Date=datetime.datetime(2010, 1, 4, 0, 0), Open=213.429998, High=214.499996, Low=212.38000099999996, Close=214.009998, Volume=123432400, Adj Close=27.727039)

In [8]:
df.show()

+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|               Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04 00:00:00|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05 00:00:00|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06 00:00:00|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07 00:00:00|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08 00:00:00|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|    

## How to use SQL syntax with the data frame?

In [14]:
df.filter("Close < 500").select(['Date', 'Open', 'Close']).show()

+-------------------+------------------+------------------+
|               Date|              Open|             Close|
+-------------------+------------------+------------------+
|2010-01-04 00:00:00|        213.429998|        214.009998|
|2010-01-05 00:00:00|        214.599998|        214.379993|
|2010-01-06 00:00:00|        214.379993|        210.969995|
|2010-01-07 00:00:00|            211.75|            210.58|
|2010-01-08 00:00:00|        210.299994|211.98000499999998|
|2010-01-11 00:00:00|212.79999700000002|210.11000299999998|
|2010-01-12 00:00:00|209.18999499999998|        207.720001|
|2010-01-13 00:00:00|        207.870005|        210.650002|
|2010-01-14 00:00:00|210.11000299999998|            209.43|
|2010-01-15 00:00:00|210.92999500000002|            205.93|
|2010-01-19 00:00:00|        208.330002|        215.039995|
|2010-01-20 00:00:00|        214.910006|            211.73|
|2010-01-21 00:00:00|        212.079994|        208.069996|
|2010-01-22 00:00:00|206.78000600000001|

## How to filter the Pythonic way

In [16]:
df.filter(df['Close']< 500).select(['Date', 'Open', 'Close']).show()

+-------------------+------------------+------------------+
|               Date|              Open|             Close|
+-------------------+------------------+------------------+
|2010-01-04 00:00:00|        213.429998|        214.009998|
|2010-01-05 00:00:00|        214.599998|        214.379993|
|2010-01-06 00:00:00|        214.379993|        210.969995|
|2010-01-07 00:00:00|            211.75|            210.58|
|2010-01-08 00:00:00|        210.299994|211.98000499999998|
|2010-01-11 00:00:00|212.79999700000002|210.11000299999998|
|2010-01-12 00:00:00|209.18999499999998|        207.720001|
|2010-01-13 00:00:00|        207.870005|        210.650002|
|2010-01-14 00:00:00|210.11000299999998|            209.43|
|2010-01-15 00:00:00|210.92999500000002|            205.93|
|2010-01-19 00:00:00|        208.330002|        215.039995|
|2010-01-20 00:00:00|        214.910006|            211.73|
|2010-01-21 00:00:00|        212.079994|        208.069996|
|2010-01-22 00:00:00|206.78000600000001|

Filtering on multiple conditions:

Scenario: retrieve all the rows that have a closing price less than \\$200.00 and an open greater than \\$200.00.
 - remember to use the ampersand (&) for the AND operator
 - enclose each filter in parenthesis

In [19]:
df.filter( (df['Close']<200) & (df['Open']>200) ).select(['Date', 'Open', 'Close']).show()

+-------------------+------------------+----------+
|               Date|              Open|     Close|
+-------------------+------------------+----------+
|2010-01-22 00:00:00|206.78000600000001|    197.75|
|2010-01-28 00:00:00|        204.930004|199.289995|
|2010-01-29 00:00:00|        201.079996|192.060003|
+-------------------+------------------+----------+



In [20]:
df.filter(df['Low'] == 197.16).select(['Date', 'Open', 'Close', 'Low']).show()

+-------------------+------------------+------+------+
|               Date|              Open| Close|   Low|
+-------------------+------------------+------+------+
|2010-01-22 00:00:00|206.78000600000001|197.75|197.16|
+-------------------+------------------+------+------+



## If I need to manipulate this result, I can use the COLLECT() method

In [24]:
result = df.filter(df['Low']==197.16).collect()

In [27]:
result  # returns entire list

[Row(Date=datetime.datetime(2010, 1, 22, 0, 0), Open=206.78000600000001, High=207.499996, Low=197.16, Close=197.75, Volume=220441900, Adj Close=25.620401)]

In [28]:
result[0] # returns first item on the list

Row(Date=datetime.datetime(2010, 1, 22, 0, 0), Open=206.78000600000001, High=207.499996, Low=197.16, Close=197.75, Volume=220441900, Adj Close=25.620401)

In [29]:
row = result[0]

In [30]:
row.asDict()

{'Date': datetime.datetime(2010, 1, 22, 0, 0),
 'Open': 206.78000600000001,
 'High': 207.499996,
 'Low': 197.16,
 'Close': 197.75,
 'Volume': 220441900,
 'Adj Close': 25.620401}

In [31]:
row.asDict()['Volume']

220441900