In [1]:
import findspark
findspark.init('/home/spark/spark-2.4.5-bin-hadoop2.6/')

In [2]:
#Start SparkSession
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName("Operations").getOrCreate()

In [4]:
#take dataframe from local machine
df = spark.read.csv('appl_stock.csv', inferSchema=True, header=True)

In [5]:
df.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



### Filtering Data

In [6]:
#Using SQL
df.filter('Close>400').show()

+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|               Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|2011-07-26 00:00:00|        399.999996|        404.499992|399.67998900000003|        403.410007|119145600|         52.265619|
|2011-09-16 00:00:00|395.54000099999996|        400.500008|395.03000299999997|        400.500008|174628300|         51.888601|
|2011-09-19 00:00:00|        397.000008|        413.229992|        395.199993|        411.630009|205965200|         53.330599|
|2011-09-20 00:00:00|        415.250011|        422.860012|        411.189999|        413.449997|193938500|         53.566396|
|2011-09-21 00:00:00|        419.639992|        421.589996|        412.000004|412.14000699999997|151494000|    

In [7]:
#Using SQL with .select()
df.filter("Close<400").select('Open').show()

+------------------+
|              Open|
+------------------+
|        213.429998|
|        214.599998|
|        214.379993|
|            211.75|
|        210.299994|
|212.79999700000002|
|209.18999499999998|
|        207.870005|
|210.11000299999998|
|210.92999500000002|
|        208.330002|
|        214.910006|
|        212.079994|
|206.78000600000001|
|202.51000200000001|
|205.95000100000001|
|        206.849995|
|        204.930004|
|        201.079996|
|192.36999699999998|
+------------------+
only showing top 20 rows



In [8]:
df.filter("Close<400").select(['Open', 'Close', 'High', 'Low']).show()

+------------------+------------------+------------------+------------------+
|              Open|             Close|              High|               Low|
+------------------+------------------+------------------+------------------+
|        213.429998|        214.009998|        214.499996|212.38000099999996|
|        214.599998|        214.379993|        215.589994|        213.249994|
|        214.379993|        210.969995|            215.23|        210.750004|
|            211.75|            210.58|        212.000006|        209.050005|
|        210.299994|211.98000499999998|        212.000006|209.06000500000002|
|212.79999700000002|210.11000299999998|        213.000002|        208.450005|
|209.18999499999998|        207.720001|209.76999500000002|        206.419998|
|        207.870005|        210.650002|210.92999500000002|        204.099998|
|210.11000299999998|            209.43|210.45999700000002|        209.020004|
|210.92999500000002|            205.93|211.59999700000003|      

In [11]:
#Another way of filtering
df.filter(df['Close'] > 600).show(10)

+-------------------+-----------------+-----------------+-----------------+-----------------+---------+---------+
|               Date|             Open|             High|              Low|            Close|   Volume|Adj Close|
+-------------------+-----------------+-----------------+-----------------+-----------------+---------+---------+
|2012-03-19 00:00:00|        598.37001|601.7699809999999|       589.050011|       601.100006|225309000|77.878247|
|2012-03-20 00:00:00|        599.51001|       606.899979|       591.480026|       605.959984|204165500|78.507903|
|2012-03-21 00:00:00|        602.73999|       609.650002|601.4100269999999|602.4999849999999|161010500|78.059627|
|2012-03-26 00:00:00|       599.790016|       607.150024|       595.259979|        606.97998|148935500|78.640054|
|2012-03-27 00:00:00|       606.180016|       616.280006|       606.060013|614.4800190000001|151782400|79.611755|
|2012-03-28 00:00:00|618.3799740000001|       621.450005|610.3099900000001|        617.6

In [13]:
#Collecting results as Python objects
result = df.filter(df['Low']==213.249994).collect()

In [21]:
type(result)

list

In [16]:
type(result[0])

pyspark.sql.types.Row

In [19]:
row = result[0]

In [22]:
row.asDict()

{'Adj Close': 27.774976000000002,
 'Close': 214.379993,
 'Date': datetime.datetime(2010, 1, 5, 0, 0),
 'High': 215.589994,
 'Low': 213.249994,
 'Open': 214.599998,
 'Volume': 150476200}

In [24]:
row_dict = row.asDict()

In [28]:
row_dict

{'Adj Close': 27.774976000000002,
 'Close': 214.379993,
 'Date': datetime.datetime(2010, 1, 5, 0, 0),
 'High': 215.589994,
 'Low': 213.249994,
 'Open': 214.599998,
 'Volume': 150476200}

In [33]:
for item in row_dict.items():
    print(item)

('High', 215.589994)
('Adj Close', 27.774976000000002)
('Volume', 150476200)
('Date', datetime.datetime(2010, 1, 5, 0, 0))
('Close', 214.379993)
('Open', 214.599998)
('Low', 213.249994)
