In [1]:
#basic operations with spark dataframes
from pyspark.sql import SparkSession

In [2]:
#creating a spark session
spark = SparkSession.builder.appName("Operations").getOrCreate()

In [3]:
#reading the dataset/ letting spark know that the dataset contains row header and infer schema
appleDF = spark.read.csv("///apple_stock.csv", header=True, inferSchema=True)

In [4]:
#schema
appleDF.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [5]:
#using sql
appleDF.filter("Close<500").show(3)


+----------+----------+----------+------------------+----------+---------+------------------+
|      Date|      Open|      High|               Low|     Close|   Volume|         Adj Close|
+----------+----------+----------+------------------+----------+---------+------------------+
|2010-01-04|213.429998|214.499996|212.38000099999996|214.009998|123432400|         27.727039|
|2010-01-05|214.599998|215.589994|        213.249994|214.379993|150476200|27.774976000000002|
|2010-01-06|214.379993|    215.23|        210.750004|210.969995|138040000|27.333178000000004|
+----------+----------+----------+------------------+----------+---------+------------------+
only showing top 3 rows



In [6]:
#sql
appleDF.filter("Close<500").select('Open').show()

+------------------+
|              Open|
+------------------+
|        213.429998|
|        214.599998|
|        214.379993|
|            211.75|
|        210.299994|
|212.79999700000002|
|209.18999499999998|
|        207.870005|
|210.11000299999998|
|210.92999500000002|
|        208.330002|
|        214.910006|
|        212.079994|
|206.78000600000001|
|202.51000200000001|
|205.95000100000001|
|        206.849995|
|        204.930004|
|        201.079996|
|192.36999699999998|
+------------------+
only showing top 20 rows



In [7]:
appleDF.filter("Close<500").select("Open", "Close").show(5)

+----------+------------------+
|      Open|             Close|
+----------+------------------+
|213.429998|        214.009998|
|214.599998|        214.379993|
|214.379993|        210.969995|
|    211.75|            210.58|
|210.299994|211.98000499999998|
+----------+------------------+
only showing top 5 rows



In [8]:
#using python
appleDF.filter(appleDF["Close"] < 200 ).show(5)

+----------+------------------+----------+------------------+----------+---------+------------------+
|      Date|              Open|      High|               Low|     Close|   Volume|         Adj Close|
+----------+------------------+----------+------------------+----------+---------+------------------+
|2010-01-22|206.78000600000001|207.499996|            197.16|    197.75|220441900|         25.620401|
|2010-01-28|        204.930004|205.500004|        198.699995|199.289995|293375600|25.819922000000002|
|2010-01-29|        201.079996|202.199995|        190.250002|192.060003|311488100|         24.883208|
|2010-02-01|192.36999699999998|     196.0|191.29999899999999|194.729998|187469100|         25.229131|
|2010-02-02|        195.909998|196.319994|193.37999299999998|195.859997|174585600|25.375532999999997|
+----------+------------------+----------+------------------+----------+---------+------------------+
only showing top 5 rows



In [9]:
appleDF.filter((appleDF["Close"] < 200) & (appleDF["Open"] > 200) ).show(5)

+----------+------------------+----------+----------+----------+---------+------------------+
|      Date|              Open|      High|       Low|     Close|   Volume|         Adj Close|
+----------+------------------+----------+----------+----------+---------+------------------+
|2010-01-22|206.78000600000001|207.499996|    197.16|    197.75|220441900|         25.620401|
|2010-01-28|        204.930004|205.500004|198.699995|199.289995|293375600|25.819922000000002|
|2010-01-29|        201.079996|202.199995|190.250002|192.060003|311488100|         24.883208|
+----------+------------------+----------+----------+----------+---------+------------------+



In [10]:
appleDF.filter( (appleDF["Close"] > 200) | (appleDF["Open"] >200)).show(5)

+----------+----------+----------+------------------+------------------+---------+------------------+
|      Date|      Open|      High|               Low|             Close|   Volume|         Adj Close|
+----------+----------+----------+------------------+------------------+---------+------------------+
|2010-01-04|213.429998|214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05|214.599998|215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06|214.379993|    215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07|    211.75|212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08|210.299994|212.000006|209.06000500000002|211.98000499999998|111902700|         27.464034|
+----------+----------+----------+------------------+------------------+---------+------------------+
only showing top 5 rows



In [11]:
appleDF.filter( (appleDF["Close"] < 200) & ~ (appleDF["Open"] < 200)).show()

+----------+------------------+----------+----------+----------+---------+------------------+
|      Date|              Open|      High|       Low|     Close|   Volume|         Adj Close|
+----------+------------------+----------+----------+----------+---------+------------------+
|2010-01-22|206.78000600000001|207.499996|    197.16|    197.75|220441900|         25.620401|
|2010-01-28|        204.930004|205.500004|198.699995|199.289995|293375600|25.819922000000002|
|2010-01-29|        201.079996|202.199995|190.250002|192.060003|311488100|         24.883208|
+----------+------------------+----------+----------+----------+---------+------------------+



In [12]:
appleDF.filter( appleDF["Low"] == 197.16).show()

+----------+------------------+----------+------+------+---------+---------+
|      Date|              Open|      High|   Low| Close|   Volume|Adj Close|
+----------+------------------+----------+------+------+---------+---------+
|2010-01-22|206.78000600000001|207.499996|197.16|197.75|220441900|25.620401|
+----------+------------------+----------+------+------+---------+---------+



In [13]:
#collecting results as python objects
appleDF.filter(appleDF["Low"] == 197.16).collect()

[Row(Date='2010-01-22', Open=206.78000600000001, High=207.499996, Low=197.16, Close=197.75, Volume=220441900, Adj Close=25.620401)]

In [14]:
results = appleDF.filter(appleDF["Low"] == 197.16).collect()
type(results[0])

pyspark.sql.types.Row

In [15]:
row = results[0]
row.asDict()

{'Date': '2010-01-22',
 'Open': 206.78000600000001,
 'High': 207.499996,
 'Low': 197.16,
 'Close': 197.75,
 'Volume': 220441900,
 'Adj Close': 25.620401}

In [16]:
for item in results[0]:
    print(item)

2010-01-22
206.78000600000001
207.499996
197.16
197.75
220441900
25.620401
