In [None]:
### Create a Spark Object

# In Spark 2.0, SparkSession is new entry point to work with RDD, DataFrame and all othere functionalities
# Prior 2.0 SparkContext used to be an entry point 
# 
# Almost all the APIs available in SparkContext, SQLContext, HiveContext are now available in SparkSession
#    SparkContext : Entry point to work with RDD, Accumulators and broadcast variables (< Spark 2.0).
#    SQLContext   : Used for initializing the functionalities of Spark SQL (< spark 2.0).
#    HiveContext  : Super set of SQLContext (< spark 2.0).
# By Default, Spark Shell provides a "spark" object which is an instance of SparkSession class

from pyspark.sql import SparkSession

#spark = SparkSession \
#        .builder \
#        .master('yarn') \
#        .appName("Python Spark SQL basic example") \
#        .getOrCreate()

spark = SparkSession \
        .builder \
        .master("local[*]") \
        .appName("Python Spark SQL basic example") \
        .getOrCreate()

spark.conf.set("spark.sql.shuffle.partitions", 5)

print("Spark Object id created ...")

In [None]:
print("Number of partitions for shuffle changed to : " + str(spark.conf.get('spark.sql.shuffle.partitions')))

Ways to create RDD :
    ✓ External Data (HDFS, local etc)

In [None]:
##### Practice 1

# Create RDD using textFile API
rdd = spark.sparkContext.textFile('work/data/PracticeFiles/Customers')
rdd.take(5)
for i in rdd.take(5): print(i)

# Get the Number of Partitions in the RDD
Partition_Number = rdd.getNumPartitions()
print(Partition_Number)

# Get the Number of elements in each partition
rdd.glom().map(len).collect()

In [None]:
##### Practice 2

# Create RDD using textFile API and a defined number of partitions
rdd = spark.sparkContext.textFile('work/data/PracticeFiles/Customers',10)

# Get the Number of Partitions in the RDD
print(rdd.getNumPartitions())

# Get the Number of elements in each partition
rdd.glom().map(len).collect()

Ways to create RDD :
    ✓ Local Data
    ✓ Python List/Parallelized Collections
    ✓ Other RDDs
    ✓ Existing DataFrame

In [None]:
##### Practice -1

# Create a RDD from a Python List

lst = [1,2,3,4,5,6,7]
rdd = spark.sparkContext.parallelize(lst)
for i in rdd.take(5) : print(i)

In [None]:
rdd.collect()

In [None]:
help(spark.sparkContext.parallelize)

In [None]:

##### Practice -2

# Create a RDD from local file

lst = open('work/data/PracticeFiles/Customers/part-00000').read().splitlines()
lst[0:10]
rdd = spark.sparkContext.parallelize(lst)
for i in rdd.take(5) : print(i)

In [None]:
##### Practice -3

# Create RDD from range function

lst1 = range(10)
rdd = spark.sparkContext.parallelize(lst1)
for i in rdd.take(5) : print(i)

In [None]:
##### Practice -4

# Create RDD from a DataFrame

df=spark.createDataFrame(data=(('robert',35),('Mike',45)),schema=('name','age'))
df.printSchema()
df.show()

rdd1= df.rdd
print(type(rdd1))
print('-'*30)

for i in rdd1.take(2) : print(i)

------------------------------------------
        Low Level Transformations
        (map, flatMap, filter)
------------------------------------------

# map : map(f, preservesPartitioning=False) 
        ▪ Perform row level transformations where one record transforms into another record.
        ▪ Number of records in input is equal to output.
        ▪ Return a new RDD by applying a function to each element of this RDD.
        ▪ When we apply a map function to an RDD, a pipelineRDD is formed, 
          a subclass of RDD. It has all the APIs defined in the RDD.

In [None]:
# Files used in this Chapter:

'''
orders
ordItems

Please download the orders and ordItems files from Section 2 (Resources).

ord = sc.textFile('practice/retail_db/orders')
ordItems = sc.textFile('practice/retail_db/order_items')
'''

# In spark 2.0 and onwards, we can create the a spark object using SparkSession class. 
# Then using this object we can access the SparkContext
sc = spark.sparkContext

# Load the Files
ord = sc.textFile("work/data/PracticeFiles/Orders")
ordItems  = sc.textFile("work/data/PracticeFiles/Order_items")

# Map Function

In [None]:
##### Practice -1
#   PS: Project all the Order_ids.

ordMap = ord.map(lambda x : x.split(','))
for i in ordMap.take(5) : print(i)

In [None]:
ordItems = ordItems.map(lambda x : x.split(','))
for i in ordItems.take(5) : print(i)

In [None]:
ordMap = ord.map(lambda x : x.split(',')[0])
for i in ordMap.take(5) : print(i)

In [None]:
##### Practice -2
#   PS: Project all the Orders and their status.

ordMap = ord.map(lambda x : (x.split(',')[0],x.split(',')[3])).take(5)
for i in ordMap : print(i)

In [None]:
### Practice -3
#   PS: Combine Order id and status with '#'

ordMap = ord.map(lambda x : x.split(',')[0] + '#' + x.split(',')[3]).take(5)
for i in ordMap : print(i)

In [None]:
### Practice -4
#   PS: Convert the Order date into YYYY/MM/DD Format.

ordMap = ord.map(lambda x : x.split(',')[1].split(' ')[0].replace('-','/')).first()
print(ordMap)
# for i in ordMap : print(i)

In [None]:
ordMap = ord.map(lambda x : x.split(',')[1].split(' ')[1].replace(':','/')).first()
print(ordMap)

In [None]:
### Practice -5
#   PS: Create key-value pairs with key as Order id and values as whole records.

ordMap = ord.map(lambda x : (x.split(',')[0],x)).take(5)
for i in ordMap : print(i)

In [None]:
### Practice -6 
#   PS: Project all the Order_item_ids and their subtotal.

#ordItemsMap = ordItems.map(lambda x : (x.split(',')[0],x.split(',')[4]))
#ordItemsMap.take(5)

ordItemsMap = ordItems.map(lambda x : (x.split(',')[0],x.split(',')[4])).take(5)
for i in ordItemsMap : print(i)

In [None]:
### Practice -7
#   PS: Applied user defined function to convert status into lowercase.

def lowerCase(str):
    return str.lower()   

ord.map(lambda x : lowerCase(x.split(',')[3])).take(5)

# flatMap : flatMap(f, preservesPartitioning=False)
    ▪ Return a new RDD by first applying a function to all elements of this RDD, 
      and then flattening the results.
    ▪ Similar to map, but each input item can be mapped to 0 or more output items 
      (so func should return a Seq rather than a single item). 
      Number of records in input is less than or equal to output.

In [37]:
##### Practice -1
#   PS : Word count in orders file.

wordCount = ord.flatMap(lambda x : x.split(',')).map(lambda w : (w, 1)).reduceByKey(lambda x, y : x + y)
for i in wordCount.take(10) : print(i)



('1', 2)
('CLOSED', 7556)
('256', 11)
('12111', 7)
('4', 7)
('11318', 7)
('7130', 8)
('8', 9)
('2911', 7)
('9', 7)


                                                                                

In [39]:
cf = ord.flatMap(lambda x : x.split(','))#.map(lambda w : (w, 1)).reduceByKey(lambda x, y : x + y)
cf.take(6)

['1', '2013-07-25 00:00:00.0', '11599', 'CLOSED', '2', '2013-07-25 00:00:00.0']

In [40]:
cf = ord.map(lambda x : x.split(','))
cf.take(6)

[['1', '2013-07-25 00:00:00.0', '11599', 'CLOSED'],
 ['2', '2013-07-25 00:00:00.0', '256', 'PENDING_PAYMENT'],
 ['3', '2013-07-25 00:00:00.0', '12111', 'COMPLETE'],
 ['4', '2013-07-25 00:00:00.0', '8827', 'CLOSED'],
 ['5', '2013-07-25 00:00:00.0', '11318', 'COMPLETE'],
 ['6', '2013-07-25 00:00:00.0', '7130', 'COMPLETE']]

In [43]:
##### Practice -1
#   PS: Print all the orders which are closed or Complete and ordered in the year 2013.
#   Return a new dataset formed by selecting those elements of the source on which func returns true.

filteredOrd = ord.filter(lambda x : (x.split(',')[3] in ("CLOSED","COMPLETE"))  \
                                and (x.split(',')[1].split('-')[0] == '2014'))
filteredOrd.take(5)

['25882,2014-01-01 00:00:00.0,4598,COMPLETE',
 '25888,2014-01-01 00:00:00.0,6735,COMPLETE',
 '25889,2014-01-01 00:00:00.0,10045,COMPLETE',
 '25891,2014-01-01 00:00:00.0,3037,CLOSED',
 '25895,2014-01-01 00:00:00.0,1044,COMPLETE']

In [35]:
##### Practice -1
rdd = sc.parallelize((("a", (1,2,3)), ("b", (3,4,5)),("a", (1,2,3,4,5))))
def f(x): return len(x)
rdd.mapValues(f).collect()


[('a', 3), ('b', 3), ('a', 5)]