In [1]:
import pandas as pd
from operator import add

In [2]:
sc1=SparkSession.builder.appName("Test").getOrCreate() 

# Create a spark session , note you can create a new spark session it will point to the same spark context , 
# more then one SparkContex con not recide in a JVM.

In [3]:
a=sc1.sparkContext.textFile("sample_data.csv",3)

# we can also use spark.read.option , and provided various options to read different types of files 

# there are 2 ways to read file we can read it as directly and process it through SparkContext. This functionality 
# is useful when file is not given in a known format , these types of read creates a RDD which is the basic datastructure
# of spark. Here we have partitioned it into 3 parts.
# RDD's are immutable , we can not modify an RDD once it is created, 
# however we can create another RDD from it by applying transformation
# OR
# if file is not raw it can be read as a csv(or other format) into a dataframe and directly 
# and SQL operations can be performed on it.
# ALSO
# we can convert a RDD to Data frame using toDF command.

# Let's consider it as Raw for now and see how it can be processed

In [4]:
type(a)  # will give : RDD
a.getNumPartitions()
# and RDD with 3 partitions are created.

3

In [5]:
a1=a.map(lambda x : x.split(","))

# Here we have splitted each row in columns using "," as delimiter

In [6]:
a11=a1.map(lambda x : (x[0],0) if x[1]=='' else (x[0],x[1]))

# as we have missing value , we have replaced it with 0

In [7]:
a12=a11.map(lambda  x : (x[0],int(x[1]))) 

# as we want to sum these values , we have to convert it to integer

In [8]:
a12.collect()
#To print all elements on the driver, one can use the collect() method to first bring the RDD to the driver node thus: 
# rdd.collect()
# Can cause the driver to run out of memory, though, because collect() fetches the entire RDD to a single machine; if you only 
# need to print a few elements of the RDD, a safer approach is to use the take(): rdd.take(10) example a12.take(2)
# lets check the data.

[('Punit', 100),
 ('Punit', 200),
 ('Vishal', 300),
 ('Vishal', 500),
 ('Nitin', 0),
 ('Nitin', 700)]

In [9]:
a13=spark.sparkContext.parallelize(a12.collect(),3)

# Map function always created pipeline RDD. we have to convert it to RDD , and parallelize is the function we are using here 
# to convert pipeline RDD to RDD

In [10]:
a13.collect()

# lets check the data.

[('Punit', 100),
 ('Punit', 200),
 ('Vishal', 300),
 ('Vishal', 500),
 ('Nitin', 0),
 ('Nitin', 700)]

In [11]:
type(a13)  

# check if pipeline RDD is converted to RDD

pyspark.rdd.RDD

In [12]:
a13.glom().collect()

#check data in each partition

[[('Punit', 100), ('Punit', 200)],
 [('Vishal', 300), ('Vishal', 500)],
 [('Nitin', 0), ('Nitin', 700)]]

In [13]:
a14=a13.reduceByKey(add)

# reduce by key , function to add the databased on 2nd column , 1st column is considered as key

In [14]:
a14.collect()

[('Punit', 300), ('Vishal', 800), ('Nitin', 700)]

In [15]:
a15=a14.toDF()  

# this is the function we can use to convert RDD to DF

In [16]:
type(a15) 

# Check the type of a15 now
# Spark DataFrame is not same as Python DataFrame

pyspark.sql.dataframe.DataFrame

In [17]:
a15.show()

+------+---+
|    _1| _2|
+------+---+
| Punit|300|
|Vishal|800|
| Nitin|700|
+------+---+



In [18]:
a15.withColumnRenamed("_1","Name").show()

+------+---+
|  Name| _2|
+------+---+
| Punit|300|
|Vishal|800|
| Nitin|700|
+------+---+



--------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------
Lets consider file as csv

In [19]:
b=spark.read.csv("sample_data.csv")

In [20]:
b.show()

+------+----+
|   _c0| _c1|
+------+----+
| Punit| 100|
| Punit| 200|
|Vishal| 300|
|Vishal| 500|
| Nitin|null|
| Nitin| 700|
+------+----+



In [21]:
type(b) 
# Here we have created a sparkdataframe , internally it is srored as RDD itself. but sicn spark 2.0 such facility is builtin.

pyspark.sql.dataframe.DataFrame

In [22]:
b1=b.withColumnRenamed("_c0", "name").withColumnRenamed("_c1","sal")

In [23]:
b11=b1.fillna('0')

In [24]:
b11.show() # To Run a SQL on it , we have to convert it to view 

+------+---+
|  name|sal|
+------+---+
| Punit|100|
| Punit|200|
|Vishal|300|
|Vishal|500|
| Nitin|  0|
| Nitin|700|
+------+---+



In [19]:
# ---------------------------------------------------------------------------------- 

In [1]:
# Spark Streaming Fundamentals 
# with spark 2.0 onwards Spark has introduced Structured Streaming , which is an upgrage to DStream 
# as it compitable with higher level API's like DataFrames (Python) and Datasets (Scala)

In [2]:
sparkStreaming = SparkSession \
    .builder \
    .appName("StreamExample1") \
    .getOrCreate()

In [3]:
# option 1 here spark is reading only one file per run

stream_df=sparkStreaming.readStream.schema("col0 STRING, col1 INTEGER").option("maxFilesPerTrigger", 1).\
csv("sparkStream")

# option 2 here spark is reading all the files in direcrory.
#stream_df=sparkStreaming.readStream.schema("col0 STRING, col1 INTEGER").\
#csv("C:/Users/pg186028/Documents/DataVisualization/sparkStream")

In [4]:
sql1=stream_df.groupBy("col0").sum("col1")
query = sql1.writeStream.queryName("stream1").outputMode("complete").format("memory").start() 
#.format("console") will print values on Consol 

In [None]:
type(query) # Provides an additional functionality to run SQL directly on it , we have given it a name as "stream1"

In [5]:
query.status

{'message': 'Processing new data',
 'isDataAvailable': True,
 'isTriggerActive': True}

In [6]:
from time import sleep

In [7]:
for x in range(5):
    sparkStreaming.sql("selECT * from stream1").show()
    sleep(10)

+----+---------+
|col0|sum(col1)|
+----+---------+
+----+---------+

+----+---------+
|col0|sum(col1)|
+----+---------+
|   g|       30|
|   f|       20|
|   e|       10|
|   d|      360|
|   c|       80|
|   b|       60|
|   a|       60|
+----+---------+

+----+---------+
|col0|sum(col1)|
+----+---------+
|   g|       30|
|   f|       20|
|   e|       10|
|   h|      100|
|   z|       10|
|   d|      360|
|   c|       80|
|   b|       60|
|   a|       80|
+----+---------+

+----+---------+
|col0|sum(col1)|
+----+---------+
|   g|       30|
|   f|       20|
|   e|       10|
|   h|      100|
|   z|       10|
|   d|      360|
|   c|       80|
|   b|       60|
|   a|       80|
+----+---------+

+----+---------+
|col0|sum(col1)|
+----+---------+
|   g|       30|
|   f|       20|
|   e|       10|
|   h|      100|
|   z|       10|
|   d|      360|
|   c|       80|
|   b|       60|
|   a|       80|
+----+---------+

+----+---------+
|col0|sum(col1)|
+----+---------+
|   g|       30|
|   f|   

+----+---------+
|col0|sum(col1)|
+----+---------+
|   g|       30|
|   f|       20|
|   e|       10|
|   h|      100|
|   z|       10|
|   d|      360|
|   c|       80|
|   b|       60|
|   a|       80|
+----+---------+

+----+---------+
|col0|sum(col1)|
+----+---------+
|   g|       30|
|   f|       20|
|   e|       10|
|   h|      100|
|   z|       10|
|   d|      360|
|   c|       80|
|   b|       60|
|   a|       80|
+----+---------+

+----+---------+
|col0|sum(col1)|
+----+---------+
|   g|       30|
|   f|       20|
|   e|       10|
|   h|      100|
|   z|       10|
|   d|      360|
|   c|       80|
|   b|       60|
|   a|       80|
+----+---------+

+----+---------+
|col0|sum(col1)|
+----+---------+
|   g|       30|
|   f|       20|
|   e|       10|
|   h|      100|
|   z|       10|
|   d|      360|
|   c|       80|
|   b|       60|
|   a|       80|
+----+---------+

+----+---------+
|col0|sum(col1)|
+----+---------+
|   g|       30|
|   f|       20|
|   e|       10|
|   h|    

In [10]:
query.stop()