In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.0.1/spark-3.0.1-bin-hadoop3.2.tgz
!tar -xvf spark-3.0.1-bin-hadoop3.2.tgz
!pip install -q findspark

spark-3.0.1-bin-hadoop3.2/
spark-3.0.1-bin-hadoop3.2/RELEASE
spark-3.0.1-bin-hadoop3.2/examples/
spark-3.0.1-bin-hadoop3.2/examples/src/
spark-3.0.1-bin-hadoop3.2/examples/src/main/
spark-3.0.1-bin-hadoop3.2/examples/src/main/scala/
spark-3.0.1-bin-hadoop3.2/examples/src/main/scala/org/
spark-3.0.1-bin-hadoop3.2/examples/src/main/scala/org/apache/
spark-3.0.1-bin-hadoop3.2/examples/src/main/scala/org/apache/spark/
spark-3.0.1-bin-hadoop3.2/examples/src/main/scala/org/apache/spark/examples/
spark-3.0.1-bin-hadoop3.2/examples/src/main/scala/org/apache/spark/examples/ml/
spark-3.0.1-bin-hadoop3.2/examples/src/main/scala/org/apache/spark/examples/ml/FPGrowthExample.scala
spark-3.0.1-bin-hadoop3.2/examples/src/main/scala/org/apache/spark/examples/ml/GBTExample.scala
spark-3.0.1-bin-hadoop3.2/examples/src/main/scala/org/apache/spark/examples/ml/ALSExample.scala
spark-3.0.1-bin-hadoop3.2/examples/src/main/scala/org/apache/spark/examples/ml/KMeansExample.scala
spark-3.0.1-bin-hadoop3.2/example

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop3.2"

In [3]:
import findspark
findspark.init()
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").getOrCreate()

In [4]:
import pyspark.sql.functions as psql
from operator import add

**RDD assignments**

1.  Create RDDs in three different ways.

In [5]:
#Parallelize method
rdd_par = spark.sparkContext.parallelize(["Hey how you doing", "Hats are back!", "adoop"])
type(rdd_par)
rdd_par.collect()


['Hey how you doing', 'Hats are back!', 'adoop']

In [6]:
rdd_par.collect()[2]


'adoop'

In [7]:
rdd_par.count()

3

In [12]:
# Using Data Sources
word = spark.sparkContext.textFile('hello.txt')
word.count()
word.collect()


['Nikola Tesla was a Serbian-American inventor, electrical engineer, mechanical engineer, and futurist best known for his contributions to the design of the modern alternating current electricity supply system. ;',
 'Born on 10 July 1856 in Smiljan, Croatia. Died on 7 January 1943, The New Yorker - Wyndham, New York, United States.',
 'Inventions are Alternating current, Induction motor, Tesla coil.']

In [9]:
# Using transformations
rdd_trans = rdd_par.filter(lambda word:word.startswith('H'))
rdd_trans.collect()

['Hey how you doing', 'Hats are back!']

2. Read a text file and count the number of words in the file using RDD operations

In [25]:
rdd_ds = spark.sparkContext.textFile('hello.txt')
word = rdd_ds.flatMap(lambda word: word.split(' '))
print('Final count is: ')
word.count()

Final count is: 


59

3. Write a program to find the word frequency in a given file.


In [26]:
word = rdd_ds.flatMap(lambda word: word.split(' '))
word_freq = word.map(lambda word: (word, 1))
word_freq.reduceByKey(lambda a, b: a + b).collect()



[('Nikola', 1),
 ('Tesla', 2),
 ('was', 1),
 ('inventor,', 1),
 ('electrical', 1),
 ('mechanical', 1),
 ('best', 1),
 ('known', 1),
 ('his', 1),
 ('design', 1),
 ('of', 1),
 ('modern', 1),
 ('alternating', 1),
 ('current', 1),
 ('electricity', 1),
 (';', 1),
 ('10', 1),
 ('July', 1),
 ('1856', 1),
 ('in', 1),
 ('Croatia.', 1),
 ('Died', 1),
 ('January', 1),
 ('1943,', 1),
 ('The', 1),
 ('New', 2),
 ('Yorker', 1),
 ('United', 1),
 ('are', 1),
 ('motor,', 1),
 ('coil.', 1),
 ('a', 1),
 ('Serbian-American', 1),
 ('engineer,', 2),
 ('and', 1),
 ('futurist', 1),
 ('for', 1),
 ('contributions', 1),
 ('to', 1),
 ('the', 2),
 ('supply', 1),
 ('system.', 1),
 ('Born', 1),
 ('on', 2),
 ('Smiljan,', 1),
 ('7', 1),
 ('-', 1),
 ('Wyndham,', 1),
 ('York,', 1),
 ('States.', 1),
 ('Inventions', 1),
 ('Alternating', 1),
 ('current,', 1),
 ('Induction', 1)]

4. Write a program to convert all words in a file to uppercase.

In [27]:
word_upper = rdd_ds.map(lambda word: word.upper())
word_upper.collect()


['NIKOLA TESLA WAS A SERBIAN-AMERICAN INVENTOR, ELECTRICAL ENGINEER, MECHANICAL ENGINEER, AND FUTURIST BEST KNOWN FOR HIS CONTRIBUTIONS TO THE DESIGN OF THE MODERN ALTERNATING CURRENT ELECTRICITY SUPPLY SYSTEM. ;',
 'BORN ON 10 JULY 1856 IN SMILJAN, CROATIA. DIED ON 7 JANUARY 1943, THE NEW YORKER - WYNDHAM, NEW YORK, UNITED STATES.',
 'INVENTIONS ARE ALTERNATING CURRENT, INDUCTION MOTOR, TESLA COIL.']

5. Write a program to convert all words in a file to lowercase.


In [28]:
word_lower = rdd_ds.map(lambda word: word.lower())
word_lower.collect()

['nikola tesla was a serbian-american inventor, electrical engineer, mechanical engineer, and futurist best known for his contributions to the design of the modern alternating current electricity supply system. ;',
 'born on 10 july 1856 in smiljan, croatia. died on 7 january 1943, the new yorker - wyndham, new york, united states.',
 'inventions are alternating current, induction motor, tesla coil.']

6. Write a program to capitalize first leter of each words in file (use string capitalize() method).


In [29]:
word_capital = rdd_ds.map(lambda word: [w.capitalize() for w in word.split(' ')]  )
word_capital = word_capital.map(lambda word: ' '.join(word)  )
word_capital.collect()



['Nikola Tesla Was A Serbian-american Inventor, Electrical Engineer, Mechanical Engineer, And Futurist Best Known For His Contributions To The Design Of The Modern Alternating Current Electricity Supply System. ;',
 'Born On 10 July 1856 In Smiljan, Croatia. Died On 7 January 1943, The New Yorker - Wyndham, New York, United States.',
 'Inventions Are Alternating Current, Induction Motor, Tesla Coil.']

7. Find the longest length of word from given set of words.

In [30]:
longest_word = rdd_ds.flatMap(lambda word: word.split(' '))
longest_word = longest_word.map(lambda word: len(word))
longest_word.max()

16

8. Map the Registration numbers to corresponding branch. 6000 series BDA, 9000 series HDA, 1000 series ML, 2000 series VLSI, 3000 series ES, 4000 series MSc, 5000 series CC. Given registration number, generate a key-value pair of Registration Number and Corresponding Branch.

In [31]:
series = [6050, 9020, 1020, 2020, 3020, 4020, 5020]
rdd_map = spark.sparkContext.parallelize(series)
rdd_seq = rdd_map.map(lambda series: (series, 'ML') if (series >= 1000 and series < 1999)  
                           else ((series, 'VLSI') if (series >= 2000 and series < 2999)
                           else ((series, 'ES') if (series >= 3000 and series < 3999) 
                           else ((series, 'MSc') if (series >= 4000 and series < 4999) 
                           else ((series, 'CC') if (series >= 5000 and series < 5999) 
                           else ((series, 'BDA') if (series >= 6000 and series < 6999)
                           else ((series, 'HDA') if (series >= 9000 and series < 9999) else (series, 'NA'))))))))
print("Key, value are: ")
rdd_seq.collect()


Key, value are: 


[(6050, 'BDA'),
 (9020, 'HDA'),
 (1020, 'ML'),
 (2020, 'VLSI'),
 (3020, 'ES'),
 (4020, 'MSc'),
 (5020, 'CC')]

9. Text file contain numbers. Numbers are separated by one white space. There is no order to store the numbers. One line may contain one or more numbers. Find the maximum, minimum, sum and mean of numbers.


In [32]:
rdd_ds = spark.sparkContext.textFile('num.txt')
rdd_num = rdd_ds.flatMap(lambda n: n.split(' '))
rdd_num = rdd_num.map(lambda n: int(n))
print("The maximum: ", rdd_num.max())
print("The minimum: ", rdd_num.min())
print("The sum: ", rdd_num.sum())
print("The mean: ", rdd_num.mean())


The maximum:  80
The minimum:  1
The sum:  412
The mean:  45.77777777777778


10. A text file (citizen.txt) contains data about citizens of country. Fields (information in file) are Name, dob, Phone, email and state name. Another file contains mapping of state names to state code like Karnataka is codes as KA, TamilNadu as TN, Kerala KL etc. Compress the citizen.txt file by changing full state name to state code.

In [34]:
rdd_citizen = spark.sparkContext.textFile('citizen.txt')
rdd_statecode = spark.sparkContext.textFile('statecode.txt')
rdd_citizen = rdd_citizen.map(lambda citizen: citizen.split(', '))
rdd_statecode = rdd_statecode.map(lambda state: state.split(', '))
citizen = spark.createDataFrame(rdd_citizen, ['Name', 'dob', 'Phone', 'email', 'state name'])
statecodes = spark.createDataFrame(rdd_statecode, ['state name', 'state code'])
citizen.collect()
statecodes.collect()
citizen.join(statecodes, on='state name', how='left').drop('state name').show()
citizen.write.csv('bcitizen.csv')
citizen.rdd.map(lambda x: x[0] + "," + str(x)).repartition(1).saveAsTextFile('/content/drive/MyDrive/Colab Notebooks/bcitizen.txt')

+-------+----------+----------+--------------------+----------+
|   Name|       dob|     Phone|               email|state code|
+-------+----------+----------+--------------------+----------+
| Ayesha|18-10-1998|8660636816|    ala143@gmail.com|       KL |
|  Vivek|28-03-1997|8431978981|vivekvkini798@gma...|        GA|
|Avinash|20-06-1998|9481294738|     avihr@gmail.com|      null|
+-------+----------+----------+--------------------+----------+

