In [0]:
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql.functions import *     # Needed for upper, col and aggregate functions
import pandas as pd
from datetime import datetime, date

In [0]:
pdDF = spark.createDataFrame(pd.DataFrame({ 
	'city': ['Toronto', 'Toronto', 'New York'], 
	'date': [date(2021,3,31),date(2021,4,1),date(2021,4,1)], 
	'temp': [10, 1, 4]}))
pdDF.collect()

Out[2]: [Row(city='Toronto', date=datetime.date(2021, 3, 31), temp=10),
 Row(city='Toronto', date=datetime.date(2021, 4, 1), temp=1),
 Row(city='New York', date=datetime.date(2021, 4, 1), temp=4)]

In [0]:
rowDF = df = spark.createDataFrame([
	Row(city='Toronto', date=date(2021,3,31), temp=10),
	Row(city='Toronto', date=date(2021,4,1), temp=1),
	Row(city='New York', date=date(2021,4,1), temp=4)])
rowDF.collect()

Out[3]: [Row(city='Toronto', date=datetime.date(2021, 3, 31), temp=10),
 Row(city='Toronto', date=datetime.date(2021, 4, 1), temp=1),
 Row(city='New York', date=datetime.date(2021, 4, 1), temp=4)]

In [0]:
schemaDF = spark.createDataFrame([('Toronto', date(2021,3,31), 10),
                                  ('Toronto', date(2021,4,1), 1),
                                  ('New York', date(2021,4,1), 4)],
 					schema='city string, date date, temp int')
schemaDF.collect()

Out[4]: [Row(city='Toronto', date=datetime.date(2021, 3, 31), temp=10),
 Row(city='Toronto', date=datetime.date(2021, 4, 1), temp=1),
 Row(city='New York', date=datetime.date(2021, 4, 1), temp=4)]

In [0]:
schema = StructType([StructField('city', StringType(), False),
                     StructField('date', DateType(), False),
                     StructField('temp', IntegerType(), False)])

data = [['Toronto', date(2021,3,31), 10],
		['Toronto', date(2021,4,1), 1],
		['New York', date(2021,4,1), 4]]

structDF = spark.createDataFrame(data,schema)

structDF.collect()


Out[5]: [Row(city='Toronto', date=datetime.date(2021, 3, 31), temp=10),
 Row(city='Toronto', date=datetime.date(2021, 4, 1), temp=1),
 Row(city='New York', date=datetime.date(2021, 4, 1), temp=4)]

In [0]:
apple = spark.read.csv('/FileStore/tables/appl_daily.csv', inferSchema=True, header=True)
facebook = spark.read.options(inferSchema=True, header=True).csv('/FileStore/tables/face_daily.csv')
netflix = spark.read.options(inferSchema=True, header=True).load('/FileStore/tables/nflx_daily.csv', 'csv')

apple.show(5)
facebook.show(5)
netflix.show(5)


+----------+----------+----------+----------+----------+--------+----------+
|      Date|      Open|      High|       Low|     Close|  Volume| Adj Close|
+----------+----------+----------+----------+----------+--------+----------+
|2015-07-09|123.849998|124.059998|119.220001|    120.07|77821600|    120.07|
|2015-07-08|124.480003|124.639999|122.540001|    122.57|60490200|    122.57|
|2015-07-07|125.889999|126.150002|123.769997|125.690002|46716100|125.690002|
|2015-07-06|124.940002|126.230003|124.849998|     126.0|27900200|     126.0|
|2015-07-02|    126.43|126.690002|125.769997|126.440002|27122500|126.440002|
+----------+----------+----------+----------+----------+--------+----------+
only showing top 5 rows

+----------+----------+----------+----------+----------+--------+----------+
|      Date|      Open|      High|       Low|     Close|  Volume| Adj Close|
+----------+----------+----------+----------+----------+--------+----------+
|2015-12-11|104.150002|104.339996|101.910004|102.12

In [0]:
apple.write.save('/FileStore/DemoData/output/mixed_daily.json', format='json')

mixed = sc.textFile('/FileStore/DemoData/output/mixed_daily.json')

mixed.take(5)

Out[7]: ['{"Date":"2015-07-09","Open":123.849998,"High":124.059998,"Low":119.220001,"Close":120.07,"Volume":77821600,"Adj Close":120.07}',
 '{"Date":"2015-07-08","Open":124.480003,"High":124.639999,"Low":122.540001,"Close":122.57,"Volume":60490200,"Adj Close":122.57}',
 '{"Date":"2015-07-07","Open":125.889999,"High":126.150002,"Low":123.769997,"Close":125.690002,"Volume":46716100,"Adj Close":125.690002}',
 '{"Date":"2015-07-06","Open":124.940002,"High":126.230003,"Low":124.849998,"Close":126.0,"Volume":27900200,"Adj Close":126.0}',
 '{"Date":"2015-07-02","Open":126.43,"High":126.690002,"Low":125.769997,"Close":126.440002,"Volume":27122500,"Adj Close":126.440002}']

In [0]:
mixed.count()

Out[8]: 8718

In [0]:
facebook.write.json('/FileStore/DemoData/output/mixed_daily.json', mode='ignore')

mixed = spark.read.json('/FileStore/DemoData/output/mixed_daily.json')

mixed.count()

Out[9]: 8718

In [0]:
netflix.write.mode('append').json('/FileStore/DemoData/output/mixed_daily.json')

mixed = spark.read.json('/FileStore/DemoData/output/mixed_daily.json')

mixed.count()

Out[12]: 3414

In [0]:
dbutils.fs.rm('/FileStore/DemoData/output/mixed_daily.json', recurse=True)

Out[13]: True

In [0]:
# Note that the truncate option doesn't really affect the query in this case, as the number of columns is not very high
apple.show(5, truncate=True)     

apple.show(5, vertical=True)

# Note that print is needed, otherwise only the last statement's output would remain at the end
print(apple.limit(5).collect())
print()
print(apple.take(5))
print()
print(apple.tail(5))

+----------+----------+----------+----------+----------+--------+----------+
|      Date|      Open|      High|       Low|     Close|  Volume| Adj Close|
+----------+----------+----------+----------+----------+--------+----------+
|2015-07-09|123.849998|124.059998|119.220001|    120.07|77821600|    120.07|
|2015-07-08|124.480003|124.639999|122.540001|    122.57|60490200|    122.57|
|2015-07-07|125.889999|126.150002|123.769997|125.690002|46716100|125.690002|
|2015-07-06|124.940002|126.230003|124.849998|     126.0|27900200|     126.0|
|2015-07-02|    126.43|126.690002|125.769997|126.440002|27122500|126.440002|
+----------+----------+----------+----------+----------+--------+----------+
only showing top 5 rows

-RECORD 0---------------
 Date      | 2015-07-09 
 Open      | 123.849998 
 High      | 124.059998 
 Low       | 119.220001 
 Close     | 120.07     
 Volume    | 77821600   
 Adj Close | 120.07     
-RECORD 1---------------
 Date      | 2015-07-08 
 Open      | 124.480003 
 High  

In [0]:
# The next two functions are used to set parameters that are used in the execution
# When dealing with small data, you may not notice a difference
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)
spark.conf.set('spark.sql.repl.eagerEval.maxNumRows', 5)


In [0]:
print(apple.columns)
print()
print(apple.printSchema())

['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']

root
 |-- Date: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)

None


In [0]:
# apple.select('Date','Low').show()
apple.select(apple.Date,apple.Low).show()

+----------+----------+
|      Date|       Low|
+----------+----------+
|2015-07-09|119.220001|
|2015-07-08|122.540001|
|2015-07-07|123.769997|
|2015-07-06|124.849998|
|2015-07-02|125.769997|
|2015-07-01|125.989998|
|2015-06-30|124.860001|
|2015-06-29|124.480003|
|2015-06-26|126.510002|
|2015-06-25|     127.5|
|2015-06-24|127.120003|
|2015-06-23|126.879997|
|2015-06-22|127.080002|
|2015-06-19|126.400002|
|2015-06-18|127.220001|
|2015-06-17|126.739998|
|2015-06-16|126.370003|
|2015-06-15|125.709999|
|2015-06-12|127.110001|
|2015-06-11|128.479996|
+----------+----------+
only showing top 20 rows



In [0]:
pdDF.withColumn('CITY', upper(col('city'))).show()

pdDF.show()

+--------+----------+----+
|    CITY|      date|temp|
+--------+----------+----+
| TORONTO|2021-03-31|  10|
| TORONTO|2021-04-01|   1|
|NEW YORK|2021-04-01|   4|
+--------+----------+----+

+--------+----------+----+
|    city|      date|temp|
+--------+----------+----+
| Toronto|2021-03-31|  10|
| Toronto|2021-04-01|   1|
|New York|2021-04-01|   4|
+--------+----------+----+



In [0]:
pdDF.withColumn('upper_city', upper(col('city'))).show()

pdDF.show()

+--------+----------+----+----------+
|    city|      date|temp|upper_city|
+--------+----------+----+----------+
| Toronto|2021-03-31|  10|   TORONTO|
| Toronto|2021-04-01|   1|   TORONTO|
|New York|2021-04-01|   4|  NEW YORK|
+--------+----------+----+----------+

+--------+----------+----+
|    city|      date|temp|
+--------+----------+----+
| Toronto|2021-03-31|  10|
| Toronto|2021-04-01|   1|
|New York|2021-04-01|   4|
+--------+----------+----+



In [0]:
apple.filter(apple.Low<120)\
     .select('Date', 'High', 'Low')\
     .show(5)

apple.where(col('LOW')<120)\
     .select('Date', 'High', 'Low')\
     .show(5)

+----------+----------+----------+
|      Date|      High|       Low|
+----------+----------+----------+
|2015-07-09|124.059998|119.220001|
|2015-02-09|119.839996|    118.43|
|2015-02-06|    120.25|118.449997|
|2015-02-05|120.230003|    119.25|
|2015-02-04|120.510002|118.309998|
+----------+----------+----------+
only showing top 5 rows

+----------+----------+----------+
|      Date|      High|       Low|
+----------+----------+----------+
|2015-07-09|124.059998|119.220001|
|2015-02-09|119.839996|    118.43|
|2015-02-06|    120.25|118.449997|
|2015-02-05|120.230003|    119.25|
|2015-02-04|120.510002|118.309998|
+----------+----------+----------+
only showing top 5 rows



In [0]:
apple.agg(avg('High')).show()
apple.agg({'High': 'count'}).show()     # counts distinct high's

+------------------+
|         avg(High)|
+------------------+
|100.30210115026331|
+------------------+

+-----------+
|count(High)|
+-----------+
|       8718|
+-----------+



In [0]:
import datetime

#apple.agg(min('Date'))

#apple.agg(min('Date')).first()

#apple.agg(min('Date')).first()[0]

date.fromisoformat(apple.agg(min('Date')).first()[0])

Out[29]: datetime.date(1980, 12, 12)

In [0]:
print(date.fromisoformat(apple.agg(min('Date')).first()[0]) < datetime.date(1981, 1, 1))

print(date.fromisoformat(apple.agg(min('Date')).first()[0]) < datetime.date(1979, 1, 1))

True
False


In [0]:
pdDF.groupBy('city').max('temp').show()

+--------+---------+
|    city|max(temp)|
+--------+---------+
| Toronto|       10|
|New York|        4|
+--------+---------+



In [0]:
pdDF.groupBy('city')\
    .agg(min('temp').alias('min_t'))\
    .orderBy(['min_t', 'city'], ascending=[False, True])\
    .show()

+--------+-----+
|    city|min_t|
+--------+-----+
|New York|    4|
| Toronto|    1|
+--------+-----+



In [0]:
# Just checking what type of data group by produces
pdDF.groupBy('city')

Out[33]: <pyspark.sql.group.GroupedData at 0x7fbca968ff10>

In [0]:
apple.orderBy(['volume','high'], ascending=[True,False]).show()

+----------+---------+---------+---------+---------+-------+---------+
|      Date|     Open|     High|      Low|    Close| Volume|Adj Close|
+----------+---------+---------+---------+---------+-------+---------+
|1985-09-27|    15.88|     16.0|    15.88|    15.88| 250400| 0.243138|
|1983-01-11| 28.74984|29.500241| 28.74984|29.125039| 347200| 0.445933|
|1992-07-27|45.750039|46.499879|45.249961|45.249961| 599200|  1.45493|
|1981-05-18|     28.0|28.249759|     28.0|     28.0|1041600| 0.428707|
|1981-05-12|27.375041| 27.75024|27.375041|27.375041|1064000| 0.419139|
|1981-05-15| 27.49992|27.875121| 27.49992| 27.49992|1226400| 0.421051|
|1981-05-13|27.375041|27.624799| 27.25016| 27.25016|1226400| 0.417226|
|1981-05-14| 27.12528| 27.12528|26.874959|26.874959|1232000| 0.411482|
|1981-03-05| 26.00024| 26.00024|25.874799|25.874799|1344000| 0.396168|
|1981-04-14|27.875121|     28.0|27.875121|27.875121|1663200| 0.426795|
|1981-03-25|26.374879|26.374879|26.125121|26.125121|1764000| 0.400001|
|1981-

In [0]:
apple.join(netflix, apple.Date==netflix.Date)\
     .filter(apple.High<netflix.High)\
     .count()

Out[36]: 622

In [0]:
apple.join(facebook, apple.Open==facebook.Open, how = 'outer')\
     .select(apple.Date, facebook.Date, apple.Open, facebook.Open)\
     .show(5)

# Note: join on two columns doesn't seem to work. Here is a workaround (but then we invalidate the effect of the "outer" join):
apple.join(facebook, apple.Open==facebook.Open, how = 'outer')\
     .filter(apple.Close==facebook.Close)\
     .select(apple.Date, apple.Open)\
    .show(5)

+----------+----+--------+----+
|      Date|Date|    Open|Open|
+----------+----+--------+----+
|1982-07-08|null|11.12496|null|
|1982-07-09|null|11.37472|null|
|1982-07-07|null|11.50016|null|
|1982-07-12|null|11.62504|null|
|1982-07-06|null|11.62504|null|
+----------+----+--------+----+
only showing top 5 rows

+----+----+
|Date|Open|
+----+----+
+----+----+



In [0]:
print(apple.columns)
print()
print(netflix.columns)

['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']

['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']


In [0]:
apple.describe().show()

apple.summary().show()

apple.select('Open', 'Close').summary().show()

apple.summary('min', '25%', '75%', 'max').show()

+-------+----------+------------------+------------------+------------------+-----------------+-------------------+------------------+
|summary|      Date|              Open|              High|               Low|            Close|             Volume|         Adj Close|
+-------+----------+------------------+------------------+------------------+-----------------+-------------------+------------------+
|  count|      8718|              8718|              8718|              8718|             8718|               8718|              8718|
|   mean|      null| 99.05613165634311|100.30210115026331|  97.6972053953887|99.00993265657273|9.270160980729525E7|13.884593893209434|
| stddev|      null|139.93158609425606|141.10321542925595|138.53629809278098|139.8444366333735|8.896064891298278E7| 26.84995722604233|
|    min|1980-12-12|          11.12496|          11.12496|          11.00008|         11.00008|             250400|          0.168422|
|    max|2015-07-09|        702.410042|        705.0700