In [64]:
from pyspark.sql import SparkSession
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql.functions import collect_list, col
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.clustering import KMeans, BisectingKMeans
from pyspark.ml.evaluation import ClusteringEvaluator

# **Q: 01**

In [None]:
spark = SparkSession.builder.master('local[*]').appName("Lab 09").getOrCreate()

In [None]:
rdd1 = spark.read.csv("emp1.txt", sep=',', inferSchema=True, header=True)

rdd1.show()

+------+--------+--------------------+
|emp_id|emp_name|            emp_dept|
+------+--------+--------------------+
|     1|    John|   Digital Marketing|
|     2|   Alice|Software Development|
|     3|     Bob|        Data Science|
|     4| Charlie|     Human Resources|
|     5|     Eve|             Finance|
|     6| Michael|Software Development|
|     7|  Olivia|        Data Science|
|     8|  Sophia|             Finance|
|     9|    Liam|        Data Science|
|    10|   Emily|Software Development|
|    11|   James|             Finance|
|    12|Benjamin|   Digital Marketing|
|    13|    Emma|Software Development|
|    14|     Ava|        Data Science|
|    15|Isabella|     Human Resources|
|    16|   Lucas|             Finance|
|    17|   Avery|Software Development|
|    18|   Grace|        Data Science|
|    19|   Ethan|     Human Resources|
|    20|Scarlett|Software Development|
+------+--------+--------------------+
only showing top 20 rows



In [None]:
rdd2 = spark.read.csv("emp2.txt", sep=',', inferSchema=True, header=True)

rdd2.show()

+------+----------+--------------+
|emp_id|emp_salary|emp_experience|
+------+----------+--------------+
|     1|     50000|             2|
|     2|     65000|             5|
|     3|     70000|             3|
|     4|     55000|             1|
|     5|     60000|             4|
|     6|     80000|             6|
|     7|     90000|             3|
|     8|     75000|             2|
|     9|     55000|             4|
|    10|     70000|             5|
|    11|     60000|             3|
|    12|     45000|             1|
|    13|     85000|             7|
|    14|     75000|             4|
|    15|     70000|             2|
|    16|     80000|             3|
|    17|     55000|             1|
|    18|     65000|             4|
|    19|     60000|             2|
|    20|     70000|             3|
+------+----------+--------------+
only showing top 20 rows



In [None]:
rdd3 = rdd1.join(rdd2, rdd2.emp_id == rdd1.emp_id).select(rdd1.emp_id, 'emp_name', 'emp_dept', 'emp_salary')

rdd3.show()

+------+--------+--------------------+----------+
|emp_id|emp_name|            emp_dept|emp_salary|
+------+--------+--------------------+----------+
|     1|    John|   Digital Marketing|     50000|
|     2|   Alice|Software Development|     65000|
|     3|     Bob|        Data Science|     70000|
|     4| Charlie|     Human Resources|     55000|
|     5|     Eve|             Finance|     60000|
|     6| Michael|Software Development|     80000|
|     7|  Olivia|        Data Science|     90000|
|     8|  Sophia|             Finance|     75000|
|     9|    Liam|        Data Science|     55000|
|    10|   Emily|Software Development|     70000|
|    11|   James|             Finance|     60000|
|    12|Benjamin|   Digital Marketing|     45000|
|    13|    Emma|Software Development|     85000|
|    14|     Ava|        Data Science|     75000|
|    15|Isabella|     Human Resources|     70000|
|    16|   Lucas|             Finance|     80000|
|    17|   Avery|Software Development|     55000|


# **Q: 02**

In [None]:
rdd4 = rdd1.join(rdd2, rdd2.emp_id == rdd1.emp_id, "left_outer").select(rdd1.emp_id, 'emp_name', 'emp_dept', 'emp_salary')

rdd4.show()

+------+--------+--------------------+----------+
|emp_id|emp_name|            emp_dept|emp_salary|
+------+--------+--------------------+----------+
|     1|    John|   Digital Marketing|     50000|
|     2|   Alice|Software Development|     65000|
|     3|     Bob|        Data Science|     70000|
|     4| Charlie|     Human Resources|     55000|
|     5|     Eve|             Finance|     60000|
|     6| Michael|Software Development|     80000|
|     7|  Olivia|        Data Science|     90000|
|     8|  Sophia|             Finance|     75000|
|     9|    Liam|        Data Science|     55000|
|    10|   Emily|Software Development|     70000|
|    11|   James|             Finance|     60000|
|    12|Benjamin|   Digital Marketing|     45000|
|    13|    Emma|Software Development|     85000|
|    14|     Ava|        Data Science|     75000|
|    15|Isabella|     Human Resources|     70000|
|    16|   Lucas|             Finance|     80000|
|    17|   Avery|Software Development|     55000|


# **Q: 03**

In [None]:
rdd5 = rdd1.join(rdd2, rdd2.emp_id == rdd1.emp_id, "left_outer").withColumn("Total Salary", rdd2.emp_salary*rdd2.emp_experience).select(rdd1.emp_id, 'Total Salary')

rdd5.show()

+------+------------+
|emp_id|Total Salary|
+------+------------+
|     1|      100000|
|     2|      325000|
|     3|      210000|
|     4|       55000|
|     5|      240000|
|     6|      480000|
|     7|      270000|
|     8|      150000|
|     9|      220000|
|    10|      350000|
|    11|      180000|
|    12|       45000|
|    13|      595000|
|    14|      300000|
|    15|      140000|
|    16|      240000|
|    17|       55000|
|    18|      260000|
|    19|      120000|
|    20|      210000|
+------+------------+
only showing top 20 rows



In [None]:
spark.stop()

# **Q: 04**

In [None]:
conf = SparkConf().setAppName("Lab 09").setMaster("local")
sc = SparkContext(conf = conf)

In [14]:
rdd1 = sc.parallelize([(1, 'Emama'), (2, 'Iqra')])

rdd2 = sc.parallelize([(1, 20), (2, 22)])

In [15]:
broadcastAges = sc.broadcast(dict(rdd2.collect()))
rdd3 = rdd1.map(lambda x: (x[0], x[1], broadcastAges.value[x[0]]))

rdd3.collect()

[(1, 'Emama', 20), (2, 'Iqra', 22)]

# **Q: 05**

In [16]:
rdd4 = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])

In [17]:
sum = sc.accumulator(0)

In [18]:
def func(x):
  global sum
  sum += x

In [19]:
rdd4.foreach(func)
final = sum.value

print("Accumulated value is -> %i" % (final))

Accumulated value is -> 55


In [20]:
sc.stop()

# **Q: 06**

In [21]:
spark = SparkSession.builder.master('local[*]').appName("Lab 09").getOrCreate()

In [22]:
df = spark.read.csv("Movies - Movies.csv", inferSchema=True, header=True)

df.show()

+----+------+--------------------+-------+--------------------+-----------------+--------------------+----------+------+-------------------+
|Year|Length|               Title|  Genre|               Actor|          Actress|            Director|Popularity|Awards|              Image|
+----+------+--------------------+-------+--------------------+-----------------+--------------------+----------+------+-------------------+
|1990|   111|Tie Me Up! Tie Me...| Comedy|     BanderasAntonio|    AbrilVictoria|      AlmodóvarPedro|        68|    No|   NicholasCage.png|
|1991|   113|          High Heels| Comedy|          BoséMiguel|    AbrilVictoria|      AlmodóvarPedro|        68|    No|   NicholasCage.png|
|1983|   104|        Dead ZoneThe| Horror|   WalkenChristopher|      AdamsBrooke|     CronenbergDavid|        79|    No|   NicholasCage.png|
|1979|   122|                Cuba| Action|         ConnerySean|      AdamsBrooke|       LesterRichard|         6|    No|    seanConnery.png|
|1978|    94|

In [23]:
df.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Length: integer (nullable = true)
 |-- Title: string (nullable = true)
 |-- Genre: string (nullable = true)
 |-- Actor: string (nullable = true)
 |-- Actress: string (nullable = true)
 |-- Director: string (nullable = true)
 |-- Popularity: integer (nullable = true)
 |-- Awards: string (nullable = true)
 |-- Image: string (nullable = true)



In [24]:
rdd = df.select('Title', 'Year', 'Director').filter((df['Genre'] == 'Action') & (df['Awards'] == 'Yes'))

rdd.show()

+-----+----+--------+
|Title|Year|Director|
+-----+----+--------+
+-----+----+--------+



# **Q: 07**

In [27]:
rdd1 = df.select('Actor', 'Title', 'Director').filter(df['Awards'] == 'Yes').groupBy('Actor').agg(collect_list('Title').alias('Movies'), collect_list('Director').alias('Directors'))

rdd1.show(truncate=False)

+-----------------+----------------------------------------------------------------+--------------------------------------------+
|Actor            |Movies                                                          |Directors                                   |
+-----------------+----------------------------------------------------------------+--------------------------------------------+
|LintDerek De     |[AssaultThe]                                                    |[RademakersFons]                            |
|LancasterBurt    |[Airport, Come BackLittle Sheba]                                |[SeatonGeorge, MannDaniel]                  |
|BridgesBeau      |[Norma Rae]                                                     |[RittMartin]                                |
|CapolicchioLino  |[Garden of the Finzi-ContinisThe]                               |[De SicaVittorio]                           |
|LoneJohn         |[Last EmperorThe]                                               |[Berto

# **Q: 08**

In [28]:
rdd2 = df.select('title').filter(df['Awards'] == 'No').orderBy(df['Popularity'].desc()).limit(10)

rdd2.show()

+--------------------+
|               title|
+--------------------+
|        Five Corners|
|Ballad of Narayam...|
|         Let It Ride|
|        Final Notice|
|      New Year's Day|
| Guilty by Suspicion|
|   Fellini Satyricon|
|           Raw Nerve|
|     Time MachineThe|
| Long Voyage HomeThe|
+--------------------+



# **Q: 09**

In [29]:
rdd3 = df.select('Title').filter(df['Year'] < '1980').orderBy(df['Popularity'].asc()).limit(10)

rdd3.show()

+------------------+
|             Title|
+------------------+
|   White Lightning|
|      Drop KickThe|
|      Desert Rider|
| Bank on the Stars|
|           Shalako|
|           Airport|
|     Anna Christie|
|Shout at the Devil|
| Tales of Tomorrow|
|         Holocaust|
+------------------+



# **Q: 10**

In [30]:
rdd14 = df.select('Title').filter(df['Year'] < 1990).orderBy('Title')

rdd14.show()

+--------------------+
|               Title|
+--------------------+
|2001: A Space Ody...|
|             48 Hrs.|
|               8 1/2|
|A Big Hand for th...|
|  A Child Is Waiting|
|A Chorus LineThe ...|
|  A Clockwork Orange|
|A Coeur Joie(Head...|
|   A Cry in the Dark|
|  A Dry White Season|
|      A Fine Madness|
| A Fish Called Wanda|
|A Fistful of Dollars|
|     A Guy Named Joe|
|    A Lesson in Love|
|A Little Night Music|
|     A Man & a Woman|
|A Man & a Woman: ...|
|A Man for All Sea...|
|    A Matter of Time|
+--------------------+
only showing top 20 rows



# **Q: 11**

In [51]:
df =  spark.read.csv('WineQT.csv', inferSchema=True, header=True)

df.show()

+-------------+----------------+-----------+--------------+-------------------+-------------------+--------------------+-------+----+---------+-------+-------+---+
|fixed acidity|volatile acidity|citric acid|residual sugar|          chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality| Id|
+-------------+----------------+-----------+--------------+-------------------+-------------------+--------------------+-------+----+---------+-------+-------+---+
|          7.4|             0.7|        0.0|           1.9|              0.076|               11.0|                34.0| 0.9978|3.51|     0.56|    9.4|      5|  0|
|          7.8|            0.88|        0.0|           2.6|              0.098|               25.0|                67.0| 0.9968| 3.2|     0.68|    9.8|      5|  1|
|          7.8|            0.76|       0.04|           2.3|              0.092|               15.0|                54.0|  0.997|3.26|     0.65|    9.8|      5|  2|
|         11.2| 

In [52]:
df.printSchema()

root
 |-- fixed acidity: double (nullable = true)
 |-- volatile acidity: double (nullable = true)
 |-- citric acid: double (nullable = true)
 |-- residual sugar: double (nullable = true)
 |-- chlorides: double (nullable = true)
 |-- free sulfur dioxide: double (nullable = true)
 |-- total sulfur dioxide: double (nullable = true)
 |-- density: double (nullable = true)
 |-- pH: double (nullable = true)
 |-- sulphates: double (nullable = true)
 |-- alcohol: double (nullable = true)
 |-- quality: integer (nullable = true)
 |-- Id: integer (nullable = true)



In [53]:
df = df.drop('Id')

In [54]:
df.describe().show()

+-------+------------------+-------------------+-------------------+------------------+-------------------+-------------------+--------------------+--------------------+------------------+------------------+------------------+------------------+
|summary|     fixed acidity|   volatile acidity|        citric acid|    residual sugar|          chlorides|free sulfur dioxide|total sulfur dioxide|             density|                pH|         sulphates|           alcohol|           quality|
+-------+------------------+-------------------+-------------------+------------------+-------------------+-------------------+--------------------+--------------------+------------------+------------------+------------------+------------------+
|  count|              1143|               1143|               1143|              1143|               1143|               1143|                1143|                1143|              1143|              1143|              1143|              1143|
|   mean| 8.3111

In [55]:
df = df.na.drop()

In [57]:
for column in df.columns:
  quantiles = df.approxQuantile(column, [0.25, 0.75], 0)
  IQR = quantiles[1] - quantiles[0]
  lower_bound = quantiles[0] - 1.5 * IQR
  upper_bound = quantiles[1] + 1.5 * IQR
  outliers = df.filter((col(column) < lower_bound) | (col(column) > upper_bound)).count()
  df = df.filter((col(column) > lower_bound) | (col(column) < upper_bound))
  print(f"Number of Outliers in {column}: {outliers}")

Number of Outliers in fixed acidity: 44
Number of Outliers in volatile acidity: 14
Number of Outliers in citric acid: 1
Number of Outliers in residual sugar: 110
Number of Outliers in chlorides: 77
Number of Outliers in free sulfur dioxide: 18
Number of Outliers in total sulfur dioxide: 40
Number of Outliers in density: 36
Number of Outliers in pH: 20
Number of Outliers in sulphates: 43
Number of Outliers in alcohol: 12
Number of Outliers in quality: 22


In [58]:
df = df.na.drop()

In [59]:
for i in range(len(df.columns)):
  for j in range(i+1, len(df.columns)):
    correlation = df.corr(df.columns[i], df.columns[j])
    print(f"Correlation between {df.columns[i]} and {df.columns[j]} : {correlation}")

Correlation between fixed acidity and volatile acidity : -0.2507283222922268
Correlation between fixed acidity and citric acid : 0.6731572507629239
Correlation between fixed acidity and residual sugar : 0.17183053523615716
Correlation between fixed acidity and chlorides : 0.10788856981486483
Correlation between fixed acidity and free sulfur dioxide : -0.16483079329778239
Correlation between fixed acidity and total sulfur dioxide : -0.11062836769999608
Correlation between fixed acidity and density : 0.6815008826691711
Correlation between fixed acidity and pH : -0.6851625988235485
Correlation between fixed acidity and sulphates : 0.17459182521278463
Correlation between fixed acidity and alcohol : -0.07505485317982606
Correlation between fixed acidity and quality : 0.12197009989725943
Correlation between volatile acidity and citric acid : -0.5441869374183994
Correlation between volatile acidity and residual sugar : -0.005751096727715592
Correlation between volatile acidity and chlorides :

# **Q: 12**

In [61]:
assembler = VectorAssembler(inputCols=df.columns, outputCol='features')
vectorizedData = assembler.transform(df)

In [63]:
scaler = StandardScaler(inputCol='features', outputCol='scaledFeatures')
model = scaler.fit(vectorizedData)
scaledData = model.transform(vectorizedData)

In [65]:
kMeans = KMeans(featuresCol='scaledFeatures', k=3)
model = kMeans.fit(scaledData)
predictions = model.transform(scaledData)

predictions.show()

+-------------+----------------+-----------+--------------+-------------------+-------------------+--------------------+-------+----+---------+-------+-------+--------------------+--------------------+----------+
|fixed acidity|volatile acidity|citric acid|residual sugar|          chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|            features|      scaledFeatures|prediction|
+-------------+----------------+-----------+--------------+-------------------+-------------------+--------------------+-------+----+---------+-------+-------+--------------------+--------------------+----------+
|          7.4|             0.7|        0.0|           1.9|              0.076|               11.0|                34.0| 0.9978|3.51|     0.56|    9.4|      5|[7.4,0.7,0.0,1.9,...|[4.23439064960558...|         1|
|          7.8|            0.88|        0.0|           2.6|              0.098|               25.0|                67.0| 0.9968| 3.2|     0.68|    9

In [66]:
evaluator = ClusteringEvaluator(featuresCol='scaledFeatures', metricName = 'silhouette', distanceMeasure = 'squaredEuclidean')
kMeans_score = evaluator.evaluate(predictions)

print(f'Silhouette Score for K-Means: {kMeans_score}')

Silhouette Score for K-Means: 0.287294989103867


In [67]:
bisecting_kMeans = BisectingKMeans(featuresCol='scaledFeatures', k=3)
model = bisecting_kMeans.fit(scaledData)
predictions = model.transform(scaledData)

predictions.show()

+-------------+----------------+-----------+--------------+-------------------+-------------------+--------------------+-------+----+---------+-------+-------+--------------------+--------------------+----------+
|fixed acidity|volatile acidity|citric acid|residual sugar|          chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|            features|      scaledFeatures|prediction|
+-------------+----------------+-----------+--------------+-------------------+-------------------+--------------------+-------+----+---------+-------+-------+--------------------+--------------------+----------+
|          7.4|             0.7|        0.0|           1.9|              0.076|               11.0|                34.0| 0.9978|3.51|     0.56|    9.4|      5|[7.4,0.7,0.0,1.9,...|[4.23439064960558...|         0|
|          7.8|            0.88|        0.0|           2.6|              0.098|               25.0|                67.0| 0.9968| 3.2|     0.68|    9

In [68]:
bisecting_kMeans_score = evaluator.evaluate(predictions)

print(f'Silhouette Score for Bisecting K-Means: {bisecting_kMeans_score}')

Silhouette Score for Bisecting K-Means: 0.18656306141538748


In [69]:
spark.stop()