In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('opts').getOrCreate()

In [4]:
df = spark.read.csv('files/appl_stock.csv', inferSchema=True, header=True)

In [5]:
# Investigate the type of data into the columns
df.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [11]:
# Difference SQL to DF methods step by step.
df.createOrReplaceTempView('stock')

results = spark.sql('SELECT Open FROM stock WHERE CLOSE > 500 LIMIT 5')

results.show()

+------------------+
|              Open|
+------------------+
|        499.529991|
|        504.659988|
|        491.500008|
|        503.109993|
|506.88001299999996|
+------------------+



In [14]:
# SPARK filter
df.select('Open').filter('Close > 500').limit(5).show()

+------------------+
|              Open|
+------------------+
|        499.529991|
|        504.659988|
|        491.500008|
|        503.109993|
|506.88001299999996|
+------------------+



In [27]:
# SPARK filter with conditions. Conditions must be in parentheses, logic operator are   |(0r)  &(and)  ~(not)

df.select(df['Open']).filter( (df['Close'] > 500) & (df['Open'] > 507) ).limit(5).show()

+-----------------+
|             Open|
+-----------------+
|       513.079994|
|       515.079987|
|519.6699980000001|
|       521.309982|
|       527.960014|
+-----------------+



In [28]:
# Instead of show the values, we can collect them.

result = df.select(df['*']).filter( (df['Close'] > 500) & (df['Open'] > 507) ).limit(5).collect()

In [29]:
# choose and row in the list
row = result[0]

In [30]:
# Parse to dict
row.asDict()

{'Date': datetime.datetime(2012, 2, 22, 0, 0),
 'Open': 513.079994,
 'High': 515.489983,
 'Low': 509.07002300000005,
 'Close': 513.039993,
 'Volume': 120825600,
 'Adj Close': 66.46923100000001}

In [32]:
# Access the Values
row.asDict()['Volume']

120825600