In [1]:
from pyspark.sql import SparkSession

In [6]:
spark = SparkSession.builder.appName('basicop').getOrCreate()

In [8]:
df = spark.read.csv('appl_stock.csv', inferSchema = True, header=True)

In [9]:
df.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [10]:
df.show()

+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|               Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04 00:00:00|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05 00:00:00|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06 00:00:00|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07 00:00:00|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08 00:00:00|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|    

In [11]:
# Filter one column
df.filter('Close < 100').show()

+-------------------+-----------------+---------+---------+-----------------+---------+-----------------+
|               Date|             Open|     High|      Low|            Close|   Volume|        Adj Close|
+-------------------+-----------------+---------+---------+-----------------+---------+-----------------+
|2014-06-09 00:00:00|        92.699997|93.879997|    91.75|        93.699997| 75415000|        88.906324|
|2014-06-10 00:00:00|        94.730003|95.050003|    93.57|            94.25| 62777000|        89.428189|
|2014-06-11 00:00:00|        94.129997|94.760002|93.470001|        93.860001| 45681000|        89.058142|
|2014-06-12 00:00:00|        94.040001|94.120003|91.900002|        92.290001| 54749000|        87.568463|
|2014-06-13 00:00:00|        92.199997|92.440002|90.879997|        91.279999| 54525000|        86.610132|
|2014-06-16 00:00:00|        91.510002|    92.75|91.449997|        92.199997| 35561000|        87.483064|
|2014-06-17 00:00:00|        92.309998|92.6999

In [13]:
df.filter('Close < 100').select('Open').show()

+-----------------+
|             Open|
+-----------------+
|        92.699997|
|        94.730003|
|        94.129997|
|        94.040001|
|        92.199997|
|        91.510002|
|        92.309998|
|        92.269997|
|        92.290001|
|        91.849998|
|            91.32|
|            90.75|
|        90.209999|
|        90.370003|
|            90.82|
|        92.099998|
|        93.519997|
|        93.870003|
|93.66999799999999|
|        94.139999|
+-----------------+
only showing top 20 rows



In [14]:
# Select multiple columns
df.filter('close < 100').select(['Open', 'Close']).show()

+-----------------+-----------------+
|             Open|            Close|
+-----------------+-----------------+
|        92.699997|        93.699997|
|        94.730003|            94.25|
|        94.129997|        93.860001|
|        94.040001|        92.290001|
|        92.199997|        91.279999|
|        91.510002|        92.199997|
|        92.309998|92.08000200000001|
|        92.269997|            92.18|
|        92.290001|        91.860001|
|        91.849998|        90.910004|
|            91.32|90.83000200000001|
|            90.75|        90.279999|
|        90.209999|        90.360001|
|        90.370003|        90.900002|
|            90.82|        91.980003|
|        92.099998|            92.93|
|        93.519997|        93.519997|
|        93.870003|        93.480003|
|93.66999799999999|        94.029999|
|        94.139999|        95.970001|
+-----------------+-----------------+
only showing top 20 rows



In [15]:
df.filter(df["High"] > 100).show() # python syntax for filter

+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|               Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04 00:00:00|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05 00:00:00|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06 00:00:00|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07 00:00:00|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08 00:00:00|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|    

In [16]:
# Multipe Filter
df.filter((df['Close'] > 200) & (df['Open'] < 200)).select(["Open", "Close"]).show()

+----------+------------------+
|      Open|             Close|
+----------+------------------+
|198.109995|200.37999299999998|
|198.229998|            200.66|
|197.380005|        202.000004|
+----------+------------------+



In [17]:
# Here we are using show() which helps us to show data but in real life examples we would like to 
# reuse or further process the result. So to collect a resultset to a variable we can use collect()
df.filter((df['Close'] > 200) & (df['Open'] < 200)).select(["Open", "Close"]).collect()

[Row(Open=198.109995, Close=200.37999299999998),
 Row(Open=198.229998, Close=200.66),
 Row(Open=197.380005, Close=202.000004)]

In [19]:
# So now we can see it return 3 rows. Get First row
df.filter((df['Close'] > 200) & (df['Open'] < 200)).select(["Open", "Close"]).collect()[0]

Row(Open=198.109995, Close=200.37999299999998)

In [20]:
# Transform in dictionary
df.filter((df['Close'] > 200) & (df['Open'] < 200)).select(["Open", "Close"]).collect()[0].asDict()

{'Open': 198.109995, 'Close': 200.37999299999998}

In [21]:
df.filter((df['Close'] > 200) & (df['Open'] < 200)).select(["Open", "Close"]).collect()[0].asDict()["Open"]

198.109995