<a href="https://colab.research.google.com/github/maheshbabu-r/BIG_DATA/blob/main/PySpark_Assignments.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pwd
!ls
!python --version

/content
iot_devices.json  sample_data
Python 3.7.11


In [2]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

!wget -q https://dlcdn.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz  # supress download output use -q

!tar -zxvf spark-3.1.2-bin-hadoop3.2.tgz | grep "something" 2>/dev/null #Suppress tar output ---| grep "something" 2>/dev/null--- add after file_name"

!pip -q install findspark

In [3]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop3.2"

import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark import SparkContext

from pyspark.sql.functions import approx_count_distinct,collect_list
from pyspark.sql.functions import collect_set,sum,avg,max,countDistinct,count
from pyspark.sql.functions import first, last, kurtosis, min, mean, skewness 
from pyspark.sql.functions import stddev, stddev_samp, stddev_pop, sumDistinct
from pyspark.sql.functions import variance,var_samp,  var_pop

# or import like this
from pyspark.sql.functions import *

sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))
spark = SparkSession.builder.appName("PySpark 3.0 Setup on Google Colab").getOrCreate()
print(spark.sparkContext.appName)

pyspark-shell


#Assignment 1
##word-count(shakespeare.txt)

##### Problem statement:
You work as a Big Data Engineer at GrapeVine Pvt. Ltd. Your company is currently working on
English language analytics using Hadoop. Having performed this task already, the company
requires you to increase the computational efficiency using Apache Spark. You have been
assigned certain tasks for the fulfillment of this activity.

In [4]:
!wget -q https://raw.githubusercontent.com/maheshbabu-r/BIG_DATA/main/Hadoop%20Datasets/Shakespeare.txt

In [5]:
words=sc.textFile("/content/Shakespeare.txt")

In [6]:
#words.collect() #-- for all the data
words.take(10)

["Project Gutenberg's Etext of Shakespeare's First Folio/35 Plays",
 'This is our 3rd edition of most of these plays.  See the index.',
 '',
 '',
 'Copyright laws are changing all over the world, be sure to check',
 'the copyright laws for your country before posting these files!!',
 '',
 'Please take a look at the important information in this header.',
 'We encourage you to keep this file on your own disk, keeping an',
 'electronic path open for the next readers.  Do not remove this.']

In [7]:
rdd=words.flatMap(lambda x : x.split(" ")).map(lambda x : (x,1))

In [8]:
# print(rdd.collect()) #-- for all the data
print(rdd.take(10))

[('Project', 1), ("Gutenberg's", 1), ('Etext', 1), ('of', 1), ("Shakespeare's", 1), ('First', 1), ('Folio/35', 1), ('Plays', 1), ('This', 1), ('is', 1)]


In [9]:
rdd1=rdd.reduceByKey(lambda x,y:x+y)

In [10]:
# 1. Find out the count of each word in the ‘Shakespeare.txt’ dataset

#print(rdd1.collect()) #-- for all records

print(rdd1.take(10))

[('Project', 25), ('Etext', 7), ('of', 14571), ("Shakespeare's", 8), ('is', 7459), ('3rd', 1), ('edition', 3), ('these', 1000), ('plays.', 1), ('', 107574)]


In [11]:
type(rdd1.collect())

list

In [12]:
df1 = spark.createDataFrame(rdd1).toDF("key","value")

In [13]:
df1.show(10)

+-------------+------+
|          key| value|
+-------------+------+
|      Project|    25|
|        Etext|     7|
|           of| 14571|
|Shakespeare's|     8|
|           is|  7459|
|          3rd|     1|
|      edition|     3|
|        these|  1000|
|       plays.|     1|
|             |107574|
+-------------+------+
only showing top 10 rows



In [14]:
df1.printSchema()

root
 |-- key: string (nullable = true)
 |-- value: long (nullable = true)



In [15]:
# 2. Display the most commonly used words (words with the count over 100 are considered common)

df1.filter(df1.value>100).show()

+-------+------+
|    key| value|
+-------+------+
|     of| 14571|
|     is|  7459|
|  these|  1000|
|       |107574|
|    are|  2777|
| world,|   104|
|   sure|   208|
| before|   472|
|   take|   934|
|     at|  2074|
|     in|  8779|
|   this|  4669|
|     an|  1307|
|     Do|   277|
|    The|  3321|
|  Since|   131|
|further|   129|
|    new|   191|
|     we|  2284|
|     do|  1695|
+-------+------+
only showing top 20 rows



In [16]:
df1.filter(df1.value>100).orderBy(df1.value.desc()).show()

+----+------+
| key| value|
+----+------+
|    |107574|
| the| 21652|
|   I| 19071|
| and| 16624|
|  to| 14978|
|  of| 14571|
|   a| 12121|
|  my| 10465|
| you|  9838|
|  in|  8779|
|  is|  7459|
|that|  7179|
| And|  7029|
| not|  6761|
|with|  6217|
|your|  6186|
| his|  5783|
|  be|  5454|
| for|  5372|
|  it|  5329|
+----+------+
only showing top 20 rows



In [17]:
# 3. Display the words that are rarely used (words with the count below 30 are considered rare)

df1.filter(df1.value<30).orderBy(df1.value.asc()).show()

+-----------------+-----+
|              key|value|
+-----------------+-----+
|         Readable|    1|
|       Volunteers|    1|
|00ws110.zip******|    1|
|       contacting|    1|
|          files!!|    1|
|         included|    1|
|       electronic|    1|
|              [35|    1|
|          Vanilla|    1|
|           Plays]|    1|
|           Humans|    1|
|            July,|    1|
|           *These|    1|
|             2000|    1|
|        Copyright|    1|
|           [Etext|    1|
|        **Welcome|    1|
|           #2270]|    1|
|       Computers,|    1|
|        *****This|    1|
+-----------------+-----+
only showing top 20 rows



In [18]:
# 4. Display the most commonly used word
df1.orderBy(df1.value.desc()).show(1)

+---+------+
|key| value|
+---+------+
|   |107574|
+---+------+
only showing top 1 row



In [19]:
# 5. Display the least used word
df1.orderBy(df1.value.asc()).show(1)

+-------+-----+
|    key|value|
+-------+-----+
|files!!|    1|
+-------+-----+
only showing top 1 row



#Assignment 2
##IOT_Devices

_<b>Problem Statement</b>:_
Imagine that you are working as an analyst for a retail firm. Your firm collects sensor data from
all over the world. You need to analyze the same and provide insights to customers. Since you
are collecting huge amounts of sensor data, the analysis platform is chosen as Spark SQL.

In [20]:
iot = spark.read.csv(header=True, inferSchema=True, path="/content/iot_devices.json")

iot.show(10)

+----------------+--------------------------------------+---------------------+-------------+--------------+----------------------+----------------------+------------------------+-------------------+-----------+---------------+-------------------+------------------+----------------+-----------------------------+
| {"device_id": 1| "device_name": "meter-gauge-1xbYRYcj"| "ip": "68.161.225.1"| "cca2": "US"| "cca3": "USA"| "cn": "United States"| "latitude": 38.000000| "longitude": -97.000000| "scale": "Celsius"| "temp": 34| "humidity": 51| "battery_level": 8|  "c02_level": 868|  "lcd": "green"| "timestamp" :1458444054093 }|
+----------------+--------------------------------------+---------------------+-------------+--------------+----------------------+----------------------+------------------------+-------------------+-----------+---------------+-------------------+------------------+----------------+-----------------------------+
| {"device_id": 2|                   "device_name": "...| 

In [21]:
iot.printSchema()

root
 |-- {"device_id": 1: string (nullable = true)
 |--  "device_name": "meter-gauge-1xbYRYcj": string (nullable = true)
 |--  "ip": "68.161.225.1": string (nullable = true)
 |--  "cca2": "US": string (nullable = true)
 |--  "cca3": "USA": string (nullable = true)
 |--  "cn": "United States": string (nullable = true)
 |--  "latitude": 38.000000: string (nullable = true)
 |--  "longitude": -97.000000: string (nullable = true)
 |--  "scale": "Celsius": string (nullable = true)
 |--  "temp": 34: string (nullable = true)
 |--  "humidity": 51: string (nullable = true)
 |--  "battery_level": 8: string (nullable = true)
 |--  "c02_level": 868: string (nullable = true)
 |--  "lcd": "green": string (nullable = true)
 |--  "timestamp" :1458444054093 }: string (nullable = true)



#Assignment 3
##Analysis(Reliance)

$Problem statement:$
You work as a Big Data Engineer at GrapeVine Pvt. Ltd. Your company is currently working as a
Data Analytics consultant for a hedge fund. Due to the size of the available dataset, the
company requires you to increase computational efficiency using Apache Spark. You have been
assigned certain tasks for the fulfillment of this analysis through stock market backtesting.

In [22]:
!wget -q https://raw.githubusercontent.com/maheshbabu-r/BIG_DATA/main/Hadoop%20Datasets/NSE_RELIANCE_5_1.csv

In [23]:
df = spark.read.csv(header=True, inferSchema=True, path="/content/NSE_RELIANCE_5_1.csv")

df.show(2)

+-------------------+-----------+-----------+-----------+-----------+-----------+-----------+---+---+---+----+------+---------+------------+------------+------------+
|               time|       open|       high|        low|      close|        MA5|        MA6|MA7|MA8|MA9|MA10|Volume|Volume MA|   Histogram|        MACD|      Signal|
+-------------------+-----------+-----------+-----------+-----------+-----------+-----------+---+---+---+----+------+---------+------------+------------+------------+
|2020-04-03 07:20:00|1053.771557|1056.297519|1051.790411|1053.424857|1056.784037|1055.267738|NaN|NaN|NaN| NaN|354666| 140161.0|-0.038974088|-1.535973794|-1.496999706|
|2020-04-03 07:25:00|1053.573443|1055.455532|  1053.3258|1054.266844|1056.555201|1055.017514|NaN|NaN|NaN| NaN| 59611|133630.15|-0.051333212| -1.56116622|-1.509833009|
+-------------------+-----------+-----------+-----------+-----------+-----------+-----------+---+---+---+----+------+---------+------------+------------+------------

In [24]:
df.printSchema()

root
 |-- time: timestamp (nullable = true)
 |-- open: double (nullable = true)
 |-- high: double (nullable = true)
 |-- low: double (nullable = true)
 |-- close: double (nullable = true)
 |-- MA5: double (nullable = true)
 |-- MA6: double (nullable = true)
 |-- MA7: double (nullable = true)
 |-- MA8: double (nullable = true)
 |-- MA9: double (nullable = true)
 |-- MA10: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Volume MA: double (nullable = true)
 |-- Histogram: double (nullable = true)
 |-- MACD: double (nullable = true)
 |-- Signal: double (nullable = true)



In [25]:
df=df.select("time","open","high","low","close")

In [26]:
# 1. Find out the average ‘close’ price of Reliance throughout the duration of the dataset
df.select(avg("close")).show()

+-----------------+
|       avg(close)|
+-----------------+
|1414.248173455824|
+-----------------+



In [27]:
# 2. If a Reliance stock was bought at the beginning of the trading day, ‘2020-04-07’ (YYYY-MM-DD),
# at the close price of the first 5-minute window, scan the dataset to find out the point to sell the
# stock to maximize profits. You are required to print the specific timestamp

close_val=df.select("close").filter(to_date("time")=='2020-04-07').orderBy("time").show(1)
print(close_val)

df.withColumn('Result',(df['high']-1101.319061)).filter(to_date("time")=='2020-04-07').orderBy(desc("Result")).show(1)


+-----------+
|      close|
+-----------+
|1101.319061|
+-----------+
only showing top 1 row

None
+-------------------+-----------+-----------+-----------+-----------+------------------+
|               time|       open|       high|        low|      close|            Result|
+-------------------+-----------+-----------+-----------+-----------+------------------+
|2020-04-07 09:30:00|1192.402249|1202.555622|1192.402249|1199.583903|101.23656100000017|
+-------------------+-----------+-----------+-----------+-----------+------------------+
only showing top 1 row



In [28]:
# 3.Find out the net profit or net loss to be accumulated if one stock of Reliance is bought at the
# opening of every 5-minute slot and sold at the lowest possible point in that 5-minute slot

df1=df.withColumn("Losses",(df['low']-df['open']))
df1.select(sum("Losses")).show()

+------------------+
|       sum(Losses)|
+------------------+
|-8877.819898999991|
+------------------+



In [29]:
# 4. Find out the net profit or net loss to be accumulated if one stock of Reliance is bought at the
# opening of every 5-minute slot and sold at the highest possible point in that 5-minute slot

df1=df1.withColumn("Profit",(df['low']-df['open']))
df1.select(sum("Profit")).show()

+------------------+
|       sum(Profit)|
+------------------+
|-8877.819898999991|
+------------------+



#Assignment 4
##Analysis(Suicides)

In [30]:
!wget -q https://raw.githubusercontent.com/maheshbabu-r/BIG_DATA/main/Hadoop%20Datasets/Suicides.csv

In [31]:
suicide=spark.read.csv(header=True,inferSchema=True,path="/content/Suicides.csv")
suicide.printSchema()
suicide.show(5)

root
 |-- State: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Type_code: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age_group: string (nullable = true)
 |-- Total: integer (nullable = true)

+-------------+----+---------+--------------------+------+---------+-----+
|        State|Year|Type_code|                Type|Gender|Age_group|Total|
+-------------+----+---------+--------------------+------+---------+-----+
|A & N Islands|2001|   Causes|  Illness (Aids/STD)|Female|     0-14|    0|
|A & N Islands|2001|   Causes|Bankruptcy or Sud...|Female|     0-14|    0|
|A & N Islands|2001|   Causes|Cancellation/Non-...|Female|     0-14|    0|
|A & N Islands|2001|   Causes|Physical Abuse (R...|Female|     0-14|    0|
|A & N Islands|2001|   Causes|       Dowry Dispute|Female|     0-14|    0|
+-------------+----+---------+--------------------+------+---------+-----+
only showing top 5 rows



In [32]:
# 1.Find out the most common suicide cause among females in India over the entire period 2001–2012
suicide.filter(col("Gender")=='Female').groupBy("Type").count().orderBy(desc("count")).show(10)

+--------------------+-----+
|                Type|count|
+--------------------+-----+
|Others (Please Sp...| 3512|
|Illegitimate Preg...| 2100|
|             Divorce| 2100|
|Fall in Social Re...| 2100|
|By touching elect...| 2100|
|    Property Dispute| 2100|
|By Overdose of sl...| 2100|
|Farming/Agricultu...| 2100|
|By coming under r...| 2100|
|     Family Problems| 2100|
+--------------------+-----+
only showing top 10 rows



In [33]:
# 2.Find out the state-wise most common cause among males over the entire period
suicide.filter(col("Gender")=='Male').groupBy("Type").count().orderBy(desc("count")).show(10)

+--------------------+-----+
|                Type|count|
+--------------------+-----+
|Others (Please Sp...| 3751|
|        By Fire-Arms| 2100|
|Fall in Social Re...| 2100|
|Other Causes (Ple...| 2100|
|    Property Dispute| 2100|
|By coming under r...| 2100|
|By touching elect...| 2100|
|By Overdose of sl...| 2100|
|Farming/Agricultu...| 2100|
|             Student| 2100|
+--------------------+-----+
only showing top 10 rows



In [34]:
 # 3.Find out the age group-wise most common cause among males and females
 suicide.groupBy("Age_group","Gender","Type").count().orderBy(desc("count")).show(10)

+---------+------+--------------------+-----+
|Age_group|Gender|                Type|count|
+---------+------+--------------------+-----+
|    30-44|  Male|Others (Please Sp...|  788|
|    15-29|  Male|Others (Please Sp...|  786|
|    45-59|  Male|Others (Please Sp...|  772|
|    15-29|Female|Others (Please Sp...|  757|
|      60+|  Male|Others (Please Sp...|  731|
|    30-44|Female|Others (Please Sp...|  725|
|    45-59|Female|Others (Please Sp...|  694|
|     0-14|  Male|Others (Please Sp...|  674|
|     0-14|Female|Others (Please Sp...|  673|
|      60+|Female|Others (Please Sp...|  663|
+---------+------+--------------------+-----+
only showing top 10 rows



In [35]:
print(suicide.select("state").distinct().count())
print(suicide.select("Year").distinct().count())

38
12


In [36]:
# 4.Find out the total number of suicides per year per state
suicide.groupBy("Year","State").count().orderBy("Year","count").show(38*12)

+----+-----------------+-----+
|Year|            State|count|
+----+-----------------+-----+
|2001|Total (All India)|   26|
|2001|      Total (Uts)|   26|
|2001|   Total (States)|   26|
|2001|Arunachal Pradesh|  556|
|2001|      Lakshadweep|  556|
|2001|         Nagaland|  557|
|2001|      Daman & Diu|  557|
|2001|           Sikkim|  559|
|2001|     D & N Haveli|  559|
|2001|       Puducherry|  561|
|2001|          Manipur|  561|
|2001|    A & N Islands|  562|
|2001|       Chandigarh|  562|
|2001|          Mizoram|  563|
|2001|              Goa|  563|
|2001|      Uttarakhand|  563|
|2001|        Meghalaya|  563|
|2001|           Punjab|  564|
|2001|  Jammu & Kashmir|  564|
|2001|          Haryana|  565|
|2001|           Odisha|  565|
|2001|        Jharkhand|  565|
|2001|     Chhattisgarh|  565|
|2001|            Assam|  565|
|2001|      West Bengal|  566|
|2001|          Gujarat|  566|
|2001|       Tamil Nadu|  566|
|2001|        Karnataka|  566|
|2001|   Madhya Pradesh|  566|
|2001|  