In [1]:
import os
import sys

os.environ["SPARK_HOME"] = "/usr/spark2.4.3"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
# In below two lines, use /usr/bin/python2.7 if you want to use Python 2
os.environ["PYSPARK_PYTHON"] = "/usr/local/anaconda/bin/python" 
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/local/anaconda/bin/python"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.7-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

In [2]:
from pyspark import SparkContext, SparkConf
from pyspark.sql.session import SparkSession
conf = SparkConf().setAppName("appName")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)


In [3]:
spark

In [4]:
myRange = spark.range(1000).toDF("number")


In [5]:
divisBy2 = myRange.where("number % 2 = 0")


In [6]:
divisBy2.count()

500

In [7]:
#flightdata2015 is a datadframe
flightData2015 = spark\
  .read\
  .option("inferSchema", "true")\
  .option("header", "true")\
  .csv("file:/home/wilsonsagar8680/tinku/fl2015.csv")

In [8]:
flightData2015.take(2)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=252)]

In [9]:
flightData2015.sort("count").explain()

== Physical Plan ==
*(2) Sort [count#22 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(count#22 ASC NULLS FIRST, 200)
   +- *(1) FileScan csv [DEST_COUNTRY_NAME#20,ORIGIN_COUNTRY_NAME#21,count#22] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/home/wilsonsagar8680/tinku/fl2015.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>


In [10]:
spark.conf.set("spark.sql.shuffle.partitions", "5")
flightData2015.sort("count").take(2)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Togo', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Solomon Islands', count=1)]

In [11]:
#create a table called flight_data_2015
flightData2015.createOrReplaceTempView("flight_data_2015")

In [12]:
sqlWay = spark.sql("""
SELECT DEST_COUNTRY_NAME, count(1)
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
""")

In [13]:
dataFrameWay = flightData2015\
  .groupBy("DEST_COUNTRY_NAME")\
  .count()

In [14]:
sqlWay.explain()
dataFrameWay.explain()

== Physical Plan ==
*(2) HashAggregate(keys=[DEST_COUNTRY_NAME#20], functions=[count(1)])
+- Exchange hashpartitioning(DEST_COUNTRY_NAME#20, 5)
   +- *(1) HashAggregate(keys=[DEST_COUNTRY_NAME#20], functions=[partial_count(1)])
      +- *(1) FileScan csv [DEST_COUNTRY_NAME#20] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/home/wilsonsagar8680/tinku/fl2015.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>
== Physical Plan ==
*(2) HashAggregate(keys=[DEST_COUNTRY_NAME#20], functions=[count(1)])
+- Exchange hashpartitioning(DEST_COUNTRY_NAME#20, 5)
   +- *(1) HashAggregate(keys=[DEST_COUNTRY_NAME#20], functions=[partial_count(1)])
      +- *(1) FileScan csv [DEST_COUNTRY_NAME#20] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/home/wilsonsagar8680/tinku/fl2015.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>


In [15]:
spark.sql("SELECT max(count) from flight_data_2015").take(1)

[Row(max(count)=347452)]

In [16]:
from pyspark.sql.functions import max

flightData2015.select(max("count")).take(1)

[Row(max(count)=347452)]

In [17]:
maxSql = spark.sql("""
SELECT DEST_COUNTRY_NAME, sum(count) as destination_total
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
ORDER BY sum(count) DESC
LIMIT 5
""")

maxSql.show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           384342|
|           Canada|             8034|
|           Mexico|             5983|
|   United Kingdom|             1852|
|            Japan|             1538|
+-----------------+-----------------+



In [18]:
from pyspark.sql.functions import desc
flightData2015.groupBy("DEST_COUNTRY_NAME").sum("count").withColumnRenamed("sum(count)", "dest_total") \
.sort(desc("dest_total")).limit(5).show()

+-----------------+----------+
|DEST_COUNTRY_NAME|dest_total|
+-----------------+----------+
|    United States|    384342|
|           Canada|      8034|
|           Mexico|      5983|
|   United Kingdom|      1852|
|            Japan|      1538|
+-----------------+----------+



In [19]:
#create a dataframe from all csvs
staticdf = spark.read.format("csv").option("header", "true").option("inferSchema", "true") \
.load("file:/home/wilsonsagar8680/tinku/by-day/*.csv")

In [20]:
#create an sql table called rdtable
staticdf.createOrReplaceTempView("rdtable")

In [21]:
statschema = staticdf.schema

In [22]:
from pyspark.sql.functions import window,column,desc,col
staticdf.selectExpr("CustomerId", "(UnitPrice*Quantity) as totalCost", "InvoiceDate")\
.groupBy(col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
.sum("totalCost").show(5)

+----------+--------------------+------------------+
|CustomerId|              window|    sum(totalCost)|
+----------+--------------------+------------------+
|   14075.0|[2011-12-05 00:00...|316.78000000000003|
|   18180.0|[2011-12-05 00:00...|            310.73|
|   15358.0|[2011-12-05 00:00...| 830.0600000000003|
|   15392.0|[2011-12-05 00:00...|304.40999999999997|
|   15290.0|[2011-12-05 00:00...|263.02000000000004|
+----------+--------------------+------------------+
only showing top 5 rows



In [23]:
#read data as a stream
streamdf = spark.readStream.schema(statschema).option("maxFilesPerTrigger", 1)\
.format("csv").option("header", "true").load("file:/home/wilsonsagar8680/tinku/by-day/*.csv")

In [24]:
streamdf.isStreaming

True

In [25]:
purCusPerHour = streamdf.selectExpr("CustomerId", "(UnitPrice*Quantity) as totalCost", "InvoiceDate")\
.groupBy(col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
.sum("totalCost")

In [26]:
purCusPerHour.writeStream.format("memory").queryName("customerPurchases").outputMode("complete").start()

<pyspark.sql.streaming.StreamingQuery at 0x7f311038a0b8>

In [29]:
spark.sql("select * from customerPurchases order by 'sum(totalCost)' DESC").show(5)

+----------+--------------------+------------------+
|CustomerId|              window|    sum(totalCost)|
+----------+--------------------+------------------+
|   15237.0|[2011-12-08 00:00...|              83.6|
|   16811.0|[2011-12-05 00:00...|             232.3|
|   12921.0|[2011-03-30 00:00...|-87.30000000000001|
|   17652.0|[2011-03-03 00:00...|             222.3|
|   14506.0|[2011-11-22 00:00...|496.91999999999996|
+----------+--------------------+------------------+
only showing top 5 rows



In [30]:
staticdf.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)

