In [1]:
from pyspark import SparkConf

In [2]:
from pyspark import SparkContext

In [9]:
sc=SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))

In [10]:
sc

In [11]:
num=range(20)
rdd=sc.parallelize(num)

In [12]:
rdd.glom().collect()

[[0, 1, 2, 3, 4], [5, 6, 7, 8, 9], [10, 11, 12, 13, 14], [15, 16, 17, 18, 19]]

In [13]:
input_file_path_1="file:///C:/Users/ckp43_000/Documents/generic-food.csv"
food=sc.textFile(input_file_path_1)

In [58]:
food.count()

908

In [17]:
food.getNumPartitions()

2

In [18]:
food.take(5)

['FOODNAME,SCIENTIFICNAME,GROUP,SUBGROUP',
 'Angelica,Angelica keiskei,Herbs and Spices,Herbs',
 'Savoy cabbage,Brassica oleracea var. sabauda,Vegetables,Cabbages',
 'Silver linden,Tilia argentea,Herbs and Spices,Herbs',
 'Kiwi,Actinidia chinensis,Fruits,Tropical fruits']

### Creating Schema for "food" RDD
         first of all we will create a function which will be applied to each lines of food RDD and using separator ","
         it will create a list of all elements of RDD
         Then using "Row" we will create Row object with these list 
         finally from this Row object RDD ,dataFrame will be created.
         Check Below::--->>>

In [23]:
food.map(lambda x:x.split(',')).take (10)[0]

['FOODNAME', 'SCIENTIFICNAME', 'GROUP', 'SUBGROUP']

In [35]:
from pyspark.sql import Row

In [47]:
def food_schema(line):
    fields=line.split(",")
    return Row (Foodname=str(fields[0]),Scientificname=str(fields[1]),Group=str(fields[2]),Subgroup=str(fields[3]))

In [60]:
food_withSchema=food.map(food_schema)

In [61]:
food_withSchema.take(3)

[Row(Foodname='FOODNAME', Group='GROUP', Scientificname='SCIENTIFICNAME', Subgroup='SUBGROUP'),
 Row(Foodname='Angelica', Group='Herbs and Spices', Scientificname='Angelica keiskei', Subgroup='Herbs'),
 Row(Foodname='Savoy cabbage', Group='Vegetables', Scientificname='Brassica oleracea var. sabauda', Subgroup='Cabbages')]

In [68]:
food.filter(lambda row:row!="FOODNAME,SCIENTIFICNAME,GROUP,SUBGROUP").take(5)

['Angelica,Angelica keiskei,Herbs and Spices,Herbs',
 'Savoy cabbage,Brassica oleracea var. sabauda,Vegetables,Cabbages',
 'Silver linden,Tilia argentea,Herbs and Spices,Herbs',
 'Kiwi,Actinidia chinensis,Fruits,Tropical fruits',
 'Allium (Onion),Allium,Vegetables,Onion-family vegetables']

In [70]:
food_withSchema.filter(lambda row:row!="FOODNAME,SCIENTIFICNAME,GROUP,SUBGROUP").take(5)

[Row(Foodname='FOODNAME', Group='GROUP', Scientificname='SCIENTIFICNAME', Subgroup='SUBGROUP'),
 Row(Foodname='Angelica', Group='Herbs and Spices', Scientificname='Angelica keiskei', Subgroup='Herbs'),
 Row(Foodname='Savoy cabbage', Group='Vegetables', Scientificname='Brassica oleracea var. sabauda', Subgroup='Cabbages'),
 Row(Foodname='Silver linden', Group='Herbs and Spices', Scientificname='Tilia argentea', Subgroup='Herbs'),
 Row(Foodname='Kiwi', Group='Fruits', Scientificname='Actinidia chinensis', Subgroup='Tropical fruits')]

In [72]:
type(food_withSchema)

pyspark.rdd.PipelinedRDD

In [73]:
type(food)

pyspark.rdd.RDD

In [64]:
type(food_withSchema.take(2))

list

In [49]:
from pyspark.sql import SparkSession

In [51]:
spark=SparkSession.builder.appName("Rdd with Schema").getOrCreate()

In [62]:
food_df=spark.createDataFrame(food_withSchema)

In [63]:
food_df.show(10)

+--------------+----------------+--------------------+--------------------+
|      Foodname|           Group|      Scientificname|            Subgroup|
+--------------+----------------+--------------------+--------------------+
|      FOODNAME|           GROUP|      SCIENTIFICNAME|            SUBGROUP|
|      Angelica|Herbs and Spices|    Angelica keiskei|               Herbs|
| Savoy cabbage|      Vegetables|Brassica oleracea...|            Cabbages|
| Silver linden|Herbs and Spices|      Tilia argentea|               Herbs|
|          Kiwi|          Fruits| Actinidia chinensis|     Tropical fruits|
|Allium (Onion)|      Vegetables|              Allium|Onion-family vege...|
|  Garden onion|      Vegetables|         Allium cepa|Onion-family vege...|
|          Leek|      Vegetables|       Allium porrum|Onion-family vege...|
|        Garlic|Herbs and Spices|      Allium sativum|               Herbs|
|        Chives|Herbs and Spices|Allium schoenoprasum|               Herbs|
+-----------

#### Informative point:-->>
                       We can apply ".filter" function on RDD as wll as dataFrame to remove any particular line from RDD
                       ANd to remove any row from dataFrame 
                       Check Below :--->>>

In [78]:
food.filter(lambda x:x!="FOODNAME,SCIENTIFICNAME,GROUP,SUBGROUP").take(5)

['Angelica,Angelica keiskei,Herbs and Spices,Herbs',
 'Savoy cabbage,Brassica oleracea var. sabauda,Vegetables,Cabbages',
 'Silver linden,Tilia argentea,Herbs and Spices,Herbs',
 'Kiwi,Actinidia chinensis,Fruits,Tropical fruits',
 'Allium (Onion),Allium,Vegetables,Onion-family vegetables']

In [77]:
food_df.filter(food_df.Foodname!='FOODNAME').show(5)

+--------------+----------------+--------------------+--------------------+
|      Foodname|           Group|      Scientificname|            Subgroup|
+--------------+----------------+--------------------+--------------------+
|      Angelica|Herbs and Spices|    Angelica keiskei|               Herbs|
| Savoy cabbage|      Vegetables|Brassica oleracea...|            Cabbages|
| Silver linden|Herbs and Spices|      Tilia argentea|               Herbs|
|          Kiwi|          Fruits| Actinidia chinensis|     Tropical fruits|
|Allium (Onion)|      Vegetables|              Allium|Onion-family vege...|
+--------------+----------------+--------------------+--------------------+
only showing top 5 rows



In [79]:
input_file_path_2="file:///C:/Users/ckp43_000/Documents/flavors.csv"
flavors=sc.textFile(input_file_path_2)

In [80]:
flavors.count()

857

In [83]:
flavors.getNumPartitions()

2

In [84]:
flavors.take(5)

['GROUP,SUB GROUP',
 'celery,vegetable',
 'corn,vegetable',
 'cucumber,vegetable',
 'horseradish,vegetable']

In [85]:
flavors.filter(lambda row:row !="GROUP,SUB GROUP").take(5)

['celery,vegetable',
 'corn,vegetable',
 'cucumber,vegetable',
 'horseradish,vegetable',
 'vegetable,vegetable']

In [86]:
filter_flavor=flavors.filter(lambda x:x!='GROUP,SUB GROUP')

In [87]:
filter_flavor.take(4)

['celery,vegetable',
 'corn,vegetable',
 'cucumber,vegetable',
 'horseradish,vegetable']

In [88]:
def flavor_schema(lines):
    fields=lines.split(',')
    return Row(Group=str(fields[0]),Subgroup=str(fields[1]))

In [90]:
filter_flavor.map(flavor_schema).take(5)

[Row(Group='celery', Subgroup='vegetable'),
 Row(Group='corn', Subgroup='vegetable'),
 Row(Group='cucumber', Subgroup='vegetable'),
 Row(Group='horseradish', Subgroup='vegetable'),
 Row(Group='vegetable', Subgroup='vegetable')]

In [91]:
fil_flavor=filter_flavor.map(flavor_schema)

In [92]:
flavor_df=spark.createDataFrame(fil_flavor,)

In [93]:
flavor_df.show(5)

+-----------+---------+
|      Group| Subgroup|
+-----------+---------+
|     celery|vegetable|
|       corn|vegetable|
|   cucumber|vegetable|
|horseradish|vegetable|
|  vegetable|vegetable|
+-----------+---------+
only showing top 5 rows



In [95]:
flavors.map(lambda row:(row.split(',')[0],row.split(',')[1]))\
.take(4)

[('GROUP', 'SUB GROUP'),
 ('celery', 'vegetable'),
 ('corn', 'vegetable'),
 ('cucumber', 'vegetable')]

In [96]:
flavors.take(4)

['GROUP,SUB GROUP', 'celery,vegetable', 'corn,vegetable', 'cucumber,vegetable']

In [103]:
flavors.flatMap(lambda x:x.split(','))\
.map(lambda word:(word,1))\
.reduceByKey(lambda x,y:x+y)\
.take(15)

[('celery', 1),
 ('vegetable', 9),
 ('horseradish', 1),
 ('herbaceous', 6),
 ('clove', 1),
 ('sage', 1),
 ('butter', 1),
 ('fatty', 8),
 ('hop_oil', 1),
 ('oily', 1),
 ('floral', 15),
 ('carnation', 1),
 ('geranium', 1),
 ('hawthorne', 1),
 ('hyacinth', 1)]

#### use of "groupBy" on RDD 
           To apply "groupBy" on any RDD input should be in key value pair 
           Ex :-->> In below example we are grouping all the words which is matching up to 4 letters 
           these four letters will be key and all the matching word till four letters will be value for the key 
           In groupBy satatement we will write a function which will take 4 letters of each element of RDD

In [127]:
new_rdd=flavors.flatMap(lambda x:x.split(','))\
.groupBy(lambda x:x[0:4])

#### In Below Example we have taken only 10 records 
     we can take as records as we want :
     Check Below :--->>

In [131]:
for (k,v) in (new_rdd.take(10)):
     print(k,list(v))

SUB  ['SUB GROUP']
cucu ['cucumber', 'cucumber seed', 'cucumber skin']
pota ['potato']
toma ['tomato', 'tomato leaf']
herb ['herbaceous', 'herbaceous', 'herbaceous', 'herbaceous', 'herbaceous', 'herbaceous', 'herb', 'herbal']
sage ['sage']
butt ['butter', 'butterscotch', 'buttered', 'buttermilk', 'buttery']
crea ['creamy', 'cream']
oily ['oily']
coum ['coumarin', 'coumarinic']


### Transformation: groupByKey / reduceByKey 
                   Q4: What if we want to calculate how many times each word is coming in corpus ?

                  Solution: We can apply the “groupByKey” / “reduceByKey” transformations on (key,val) pair RDD. The            “groupByKey” will group the values for each key in the original RDD. It will create a new pair, where the original key corresponds to this collected group of values.

        To use “groupbyKey” / “reduceByKey” transformation to find the frequencies of each words, you can follow the steps below:

     A (key,val) pair RDD is required; In this (key,val) pair RDD, key is the word and val is 1 for each word in RDD (1 represents the number for the each word in “flavor”).
    To apply “groupbyKey” / “reduceByKey” on “flavor”, we need to first convert “flavor” to (key,val) pair RDD.

In [150]:
bykey_flavor=flavors.flatMap(lambda x:x.split(','))\
.map(lambda x:(x,1))\
.groupByKey()

In [151]:
bykey_flavor.take(10)

[('celery', <pyspark.resultiterable.ResultIterable at 0x52b01477c8>),
 ('vegetable', <pyspark.resultiterable.ResultIterable at 0x52b013f348>),
 ('horseradish', <pyspark.resultiterable.ResultIterable at 0x52b0147088>),
 ('herbaceous', <pyspark.resultiterable.ResultIterable at 0x52b0147248>),
 ('clove', <pyspark.resultiterable.ResultIterable at 0x52b0147e48>),
 ('sage', <pyspark.resultiterable.ResultIterable at 0x52b0147a08>),
 ('butter', <pyspark.resultiterable.ResultIterable at 0x52b0147308>),
 ('fatty', <pyspark.resultiterable.ResultIterable at 0x52b01478c8>),
 ('hop_oil', <pyspark.resultiterable.ResultIterable at 0x52b0147fc8>),
 ('oily', <pyspark.resultiterable.ResultIterable at 0x52b0147ac8>)]

In [153]:
for j in (bykey_flavor.take(10)):
    print((j[0],list(j[1])))

('celery', [1])
('vegetable', [1, 1, 1, 1, 1, 1, 1, 1, 1])
('horseradish', [1])
('herbaceous', [1, 1, 1, 1, 1, 1])
('clove', [1])
('sage', [1])
('butter', [1])
('fatty', [1, 1, 1, 1, 1, 1, 1, 1])
('hop_oil', [1])
('oily', [1])


In [154]:
bykey_flavor.mapValues(sum).take(10)

[('celery', 1),
 ('vegetable', 9),
 ('horseradish', 1),
 ('herbaceous', 6),
 ('clove', 1),
 ('sage', 1),
 ('butter', 1),
 ('fatty', 8),
 ('hop_oil', 1),
 ('oily', 1)]

In [161]:
bykey_flavor.mapValues(sum).map(lambda x:(x[1],x[0])).sortByKey(ascending=False).take(10)

[(743, 'NULL'),
 (25, 'fruity'),
 (15, 'floral'),
 (11, 'balsamic'),
 (9, 'vegetable'),
 (8, 'fatty'),
 (6, 'herbaceous'),
 (6, 'alcohol'),
 (6, 'nutty'),
 (6, 'citrus')]

### "reduceByKey"

In [11]:
reduce_flavors_rdd=filter_file_rdd_2\
    .flatMap(lambda e:e.split(','))\
    .map(lambda e:(e,1))\
    .reduceByKey(lambda x,y:x+y)
print(" \n reduce flavors file rdd :")
for ele in reduce_flavors_rdd.collect():
    print(ele)

 
 reduce flavors file rdd :
('celery', 1)
('vegetable', 9)
('horseradish', 1)
('herbaceous', 6)
('clove', 1)
('sage', 1)
('butter', 1)
('fatty', 8)
('hop_oil', 1)
('oily', 1)
('floral', 15)
('carnation', 1)
('geranium', 1)
('hawthorne', 1)
('hyacinth', 1)
('rose', 1)
('anise', 1)
('butterscotch', 1)
('chocolate', 1)
('honey', 1)
('others', 1)
('sweet', 1)
('vanilla', 1)
('melon', 1)
('orange', 1)
('woody', 5)
('maple', 1)
('bacon', 1)
('beef', 1)
('green', 4)
('wintergreen', 1)
('alcohol', 6)
('wine_like', 1)
('rum', 1)
('mushroom', 1)
('apricot', 1)
('banana', 1)
('cherry', 1)
('coconut', 1)
('grapefruit', 1)
('jam', 1)
('mango', 1)
('peach', 1)
('pear', 1)
('pineapple', 1)
('raspberry', 1)
('tart', 1)
('watermelon', 1)
('plastic', 1)
('sulfurous', 1)
('nutty', 6)
('peanut', 1)
('walnut', 1)
('camphoraceous', 2)
('tobacco', 2)
('minty', 2)
('mossy', 2)
('smoky', 2)
('musky', 2)
('soapy', 2)
('seedy', 2)
('alkaline', 1)
('NULL', 743)
('alkane', 1)
('almond shell', 1)
('amine', 1)
('ba

### Note :--->>

    After applying reduceByKey we are interchanging the key and value pair(.map(lambda x:(x[1],x[0]))) in output which 
    we are getting after reduceByKey
    Then we are doing sorting using sortByKey in ascending order
    Check Below:---->>>

In [166]:
 flavors.flatMap(lambda x:x.split(','))\
    .map(lambda x:(x,1))\
    .reduceByKey(lambda x,y:x+y)\
    .map(lambda x:(x[1],x[0]))\
    .sortByKey(ascending=False)\
    .take(10)

[(743, 'NULL'),
 (25, 'fruity'),
 (15, 'floral'),
 (11, 'balsamic'),
 (9, 'vegetable'),
 (8, 'fatty'),
 (6, 'herbaceous'),
 (6, 'alcohol'),
 (6, 'nutty'),
 (6, 'citrus')]

In [171]:
flavors.map(lambda row:(row.split(',')[1],row.split(',')[0])).take(20)

[('SUB GROUP', 'GROUP'),
 ('vegetable', 'celery'),
 ('vegetable', 'corn'),
 ('vegetable', 'cucumber'),
 ('vegetable', 'horseradish'),
 ('vegetable', 'vegetable'),
 ('vegetable', 'potato'),
 ('vegetable', 'tomato'),
 ('herbaceous', 'caraway'),
 ('herbaceous', 'clove'),
 ('herbaceous', 'fennel'),
 ('herbaceous', 'herbaceous'),
 ('herbaceous', 'sage'),
 ('fatty', 'butter'),
 ('fatty', 'cheese'),
 ('fatty', 'creamy'),
 ('fatty', 'hop_oil'),
 ('fatty', 'oily'),
 ('fatty', 'fatty'),
 ('fatty', 'sour')]

In [172]:
lst1=[(1,"A"),(2,"B")]
lst2=[(1,"P"),(2,"L")]
rdd1=sc.parallelize(lst1)
rdd2=sc.parallelize(lst2)
rdd1.collect()

[(1, 'A'), (2, 'B')]

In [23]:
rdd2.collect()

[(1, 'P'), (2, 'L')]

In [24]:
rdd1.cogroup(rdd2).collect()

[(1,
  (<pyspark.resultiterable.ResultIterable at 0x1bbc1ca388>,
   <pyspark.resultiterable.ResultIterable at 0x1bbc1ca5c8>)),
 (2,
  (<pyspark.resultiterable.ResultIterable at 0x1bbc1ca7c8>,
   <pyspark.resultiterable.ResultIterable at 0x1bbc1ca508>))]

In [14]:
reduce_flavors_rdd.count()

857

In [15]:
reduce_food_rdd.count()

1676

In [16]:
# join_rdd=reduce_flavors_rdd.join(reduce_food_rdd)

In [17]:
join_rdd.collect()

[('NULL', (743, 252))]

In [None]:
join_rdd_1=reduce_lavors_rdd.join