In [1]:
import findspark
findspark.init()
import pyspark
import datetime

sc = pyspark.SparkContext(appName="read-big-file")

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .getOrCreate()


In [2]:
from pyspark.sql.types import *

# Our schema for the data
#
dbSchema = StructType([\
StructField("stock", StringType()),\
StructField("price",DoubleType()),\
StructField("date",StringType()),\
StructField("volume",LongType())])

# Our actual data
#
db_list = [\
('IBM',143.91,'2018-08-15',4241500),\
('IBM',145.34,'2018-08-16',5250700),\
('IBM',146.06,'2018-08-17',2678600),\
('IBM',146.38,'2018-08-20',3250700),\
('MSFT',107.66,'2018-08-15',29982800),\
('MSFT',107.64,'2018-08-16',21384300),\
('MSFT',107.58,'2018-08-17',18053800),\
('MSFT',106.65,'2018-08-20',6127595),\
('AAPL',210.24,'2018-08-15',28807600),\
('AAPL',213.32,'2018-08-16',28500400),\
('AAPL',217.58,'2018-08-17',35050600),\
('AAPL',215.80,'2018-08-20',16455456)\
]

# Convert our list of tuples to a dataframe
#
df = spark.createDataFrame(db_list,schema=dbSchema) 

# Convert the date string to an actual date
#
df=df.withColumn("date",df["date"].cast(DateType()))


In [3]:
df.show()


+-----+------+----------+--------+
|stock| price|      date|  volume|
+-----+------+----------+--------+
|  IBM|143.91|2018-08-15| 4241500|
|  IBM|145.34|2018-08-16| 5250700|
|  IBM|146.06|2018-08-17| 2678600|
|  IBM|146.38|2018-08-20| 3250700|
| MSFT|107.66|2018-08-15|29982800|
| MSFT|107.64|2018-08-16|21384300|
| MSFT|107.58|2018-08-17|18053800|
| MSFT|106.65|2018-08-20| 6127595|
| AAPL|210.24|2018-08-15|28807600|
| AAPL|213.32|2018-08-16|28500400|
| AAPL|217.58|2018-08-17|35050600|
| AAPL| 215.8|2018-08-20|16455456|
+-----+------+----------+--------+



In [4]:
df.createOrReplaceTempView("stock")

In [5]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
sqlContext.sql("SELECT * FROM stock")

DataFrame[stock: string, price: double, date: date, volume: bigint]

In [6]:
sqlContext.sql('SELECT stock,price,volume,date,rank() over(partition by date order by price desc ) as rnk FROM stock').show()

+-----+------+--------+----------+---+
|stock| price|  volume|      date|rnk|
+-----+------+--------+----------+---+
| AAPL|213.32|28500400|2018-08-16|  1|
|  IBM|145.34| 5250700|2018-08-16|  2|
| MSFT|107.64|21384300|2018-08-16|  3|
| AAPL|217.58|35050600|2018-08-17|  1|
|  IBM|146.06| 2678600|2018-08-17|  2|
| MSFT|107.58|18053800|2018-08-17|  3|
| AAPL|210.24|28807600|2018-08-15|  1|
|  IBM|143.91| 4241500|2018-08-15|  2|
| MSFT|107.66|29982800|2018-08-15|  3|
| AAPL| 215.8|16455456|2018-08-20|  1|
|  IBM|146.38| 3250700|2018-08-20|  2|
| MSFT|106.65| 6127595|2018-08-20|  3|
+-----+------+--------+----------+---+



In [8]:
sqlContext.sql('SELECT stock,dense_rank() over(order by stock) as rnk FROM stock').show()

+-----+---+
|stock|rnk|
+-----+---+
| AAPL|  1|
| AAPL|  1|
| AAPL|  1|
| AAPL|  1|
|  IBM|  2|
|  IBM|  2|
|  IBM|  2|
|  IBM|  2|
| MSFT|  3|
| MSFT|  3|
| MSFT|  3|
| MSFT|  3|
+-----+---+



In [9]:
sqlContext.sql('SELECT stock,price,volume,date,row_number() over(partition by date order by price desc ) as rn FROM stock').show()

+-----+------+--------+----------+---+
|stock| price|  volume|      date| rn|
+-----+------+--------+----------+---+
| AAPL|213.32|28500400|2018-08-16|  1|
|  IBM|145.34| 5250700|2018-08-16|  2|
| MSFT|107.64|21384300|2018-08-16|  3|
| AAPL|217.58|35050600|2018-08-17|  1|
|  IBM|146.06| 2678600|2018-08-17|  2|
| MSFT|107.58|18053800|2018-08-17|  3|
| AAPL|210.24|28807600|2018-08-15|  1|
|  IBM|143.91| 4241500|2018-08-15|  2|
| MSFT|107.66|29982800|2018-08-15|  3|
| AAPL| 215.8|16455456|2018-08-20|  1|
|  IBM|146.38| 3250700|2018-08-20|  2|
| MSFT|106.65| 6127595|2018-08-20|  3|
+-----+------+--------+----------+---+



In [11]:
sqlContext.sql('SELECT stock,price,volume,date,row_number() over(order by stock) as rn FROM stock').show()

+-----+------+--------+----------+---+
|stock| price|  volume|      date| rn|
+-----+------+--------+----------+---+
| AAPL|210.24|28807600|2018-08-15|  1|
| AAPL|213.32|28500400|2018-08-16|  2|
| AAPL|217.58|35050600|2018-08-17|  3|
| AAPL| 215.8|16455456|2018-08-20|  4|
|  IBM|143.91| 4241500|2018-08-15|  5|
|  IBM|145.34| 5250700|2018-08-16|  6|
|  IBM|146.06| 2678600|2018-08-17|  7|
|  IBM|146.38| 3250700|2018-08-20|  8|
| MSFT|107.66|29982800|2018-08-15|  9|
| MSFT|107.64|21384300|2018-08-16| 10|
| MSFT|107.58|18053800|2018-08-17| 11|
| MSFT|106.65| 6127595|2018-08-20| 12|
+-----+------+--------+----------+---+



In [15]:
df2=df

In [16]:
df2.cache()

DataFrame[stock: string, price: double, date: date, volume: bigint]

In [17]:
df2.count()

12

In [18]:
for i in range(8):
    df2=df2.unionAll(df2)
    df2.cache()
    df2.count()
    

In [19]:
df2.count()

3072

In [20]:
df2.createOrReplaceTempView("stock2")

In [21]:
print(datetime.datetime.now())
stock_tmp=sqlContext.sql('select stock,price,volume,date from (SELECT stock,price,volume,date,row_number() over(partition by stock, date order by date desc ) as rn FROM stock2) where rn =1 ')
stock_tmp.show()
print(datetime.datetime.now())

2018-09-15 14:11:23.054895
+-----+------+--------+----------+
|stock| price|  volume|      date|
+-----+------+--------+----------+
| AAPL|213.32|28500400|2018-08-16|
|  IBM|146.38| 3250700|2018-08-20|
| AAPL|210.24|28807600|2018-08-15|
| MSFT|107.58|18053800|2018-08-17|
| AAPL|217.58|35050600|2018-08-17|
|  IBM|145.34| 5250700|2018-08-16|
| AAPL| 215.8|16455456|2018-08-20|
| MSFT|107.66|29982800|2018-08-15|
|  IBM|143.91| 4241500|2018-08-15|
|  IBM|146.06| 2678600|2018-08-17|
| MSFT|106.65| 6127595|2018-08-20|
| MSFT|107.64|21384300|2018-08-16|
+-----+------+--------+----------+

2018-09-15 14:12:04.090202


In [22]:
print(datetime.datetime.now())
df3=df2.dropDuplicates()
df3.show()
print(datetime.datetime.now())



2018-09-15 14:12:20.402340
+-----+------+----------+--------+
|stock| price|      date|  volume|
+-----+------+----------+--------+
|  IBM|143.91|2018-08-15| 4241500|
| MSFT|106.65|2018-08-20| 6127595|
|  IBM|146.06|2018-08-17| 2678600|
| MSFT|107.58|2018-08-17|18053800|
| AAPL| 215.8|2018-08-20|16455456|
| AAPL|217.58|2018-08-17|35050600|
| MSFT|107.66|2018-08-15|29982800|
| MSFT|107.64|2018-08-16|21384300|
|  IBM|145.34|2018-08-16| 5250700|
|  IBM|146.38|2018-08-20| 3250700|
| AAPL|210.24|2018-08-15|28807600|
| AAPL|213.32|2018-08-16|28500400|
+-----+------+----------+--------+

2018-09-15 14:13:00.277209
