## **Spark**

In [None]:
import os
import sys
import numpy

!pip install pyspark
os.environ["PYSPARK_PYTHON"] = sys.executable

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 43 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 52.0 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=61199f57c428f531f32085cfaf8d5307240e971610e7c2f287f9da18a479e143
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


In [None]:
from pyspark import SparkContext, SparkConf, SQLContext

In [None]:
# SparkConf - Configuration Before SQL & Content
conf = SparkConf().setAppName("babies").setMaster("local[*]") 
sc = SparkContext(conf=conf)
sc.setLogLevel("ERROR")
sqlContext = SQLContext(sc)



In [None]:
rdd = sc.textFile("baby_names.csv")

In [None]:
babies_header  = rdd.first() 
header = sc.parallelize([babies_header])
rdd = rdd.subtract(header)

In [None]:
for line in rdd.take(5):
  print(line)

2013,GAVIN,ST LAWRENCE,M,9
2013,LEVI,ST LAWRENCE,M,9
2013,ELIZA,KINGS,F,16
2013,ZARA,KINGS,F,16
2013,JONATHAN,NEW YORK,M,51


In [None]:
from collections import namedtuple

In [None]:
Baby = namedtuple('Baby', ['year', 
                               'first_name', 
                               'country', 
                               'sex', 
                               'count'])

In [None]:
def map_to_baby(line):
    cols = line.split(",")
    return Baby(year = cols[0],
                  first_name = cols[1].upper(),
                  country = cols[2].upper(),
                  sex = cols[3],
                  count = int(cols[4])) 

In [None]:
baby_rdd = rdd.map(map_to_baby)
df = baby_rdd.toDF()

In [None]:
df.show(5)

+----+----------+-----------+---+-----+
|year|first_name|    country|sex|count|
+----+----------+-----------+---+-----+
|2013|     GAVIN|ST LAWRENCE|  M|    9|
|2013|      LEVI|ST LAWRENCE|  M|    9|
|2013|     ELIZA|      KINGS|  F|   16|
|2013|      ZARA|      KINGS|  F|   16|
|2013|  JONATHAN|   NEW YORK|  M|   51|
+----+----------+-----------+---+-----+
only showing top 5 rows



In [None]:
df.printSchema()

root
 |-- year: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- country: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- count: long (nullable = true)



In [None]:
from pyspark.sql.functions import sum, avg, count
import pyspark.sql.functions as F
from pyspark.sql.functions import col, expr
from pyspark.sql.types import *

a. How many males has born in New York? 

In [None]:
df.filter((df["sex"] == "M") & (df["country"] == "NEW YORK"))\
  .select(sum("count").alias("MalesBabeisBornInNY")).show()

+-------------------+
|MalesBabeisBornInNY|
+-------------------+
|              45884|
+-------------------+



b. What is the country with the most amount of female newborn?

In [None]:
df.filter(df["sex"] == "F").groupBy(df["country"])\
  .agg(sum("count").alias("FemaleBabies"))\
  .orderBy("FemaleBabies", ascending=False)\
  .select("country").show(1)

+-------+
|country|
+-------+
|  KINGS|
+-------+
only showing top 1 row



c. What is the distribution of gender?

In [None]:
total_babies = df.select(sum("count").alias("total_babies")).take(1)[0]\
  .asDict()['total_babies']
  
total_babies

943428

In [None]:
df.rollup("sex").agg(sum("count").alias("total_babies_per_sex"))\
  .withColumn('perc_total_babies_per_sex', F.round((F.col('total_babies_per_sex') / total_babies ) * 100, 2)) \
  .withColumn('perc_total_babies_per_sex',F.concat(F.col('perc_total_babies_per_sex').cast(StringType()),F.lit('%')))\
  .orderBy("total_babies_per_sex").show()

+----+--------------------+-------------------------+
| sex|total_babies_per_sex|perc_total_babies_per_sex|
+----+--------------------+-------------------------+
|   F|              397693|                   42.15%|
|   M|              545735|                   57.85%|
|null|              943428|                   100.0%|
+----+--------------------+-------------------------+



d. What is the most popular name?


In [None]:
df.groupBy(df["first_name"]).agg(sum("count").alias("babies_per_name"))\
  .orderBy("babies_per_name", ascending=False).select("first_name").show(1)

+----------+
|first_name|
+----------+
|   MICHAEL|
+----------+
only showing top 1 row



e. For each country, display the average amount of newborn per year.

In [None]:
df.groupBy("country").pivot("year").agg(F.round(avg("count"), 3)).show(5)

+------------+-----+-----+-----+-----+-----+-----+-----+-----+
|     country| 2007| 2008| 2009| 2010| 2011| 2012| 2013| 2014|
+------------+-----+-----+-----+-----+-----+-----+-----+-----+
|      FULTON|  6.5|5.875|6.364|5.667|  5.9|5.333|5.333|  5.5|
|ST. LAWRENCE| null| null| null| null| null| null| null|  6.4|
| CATTARAUGUS|  6.0|5.964|6.393|6.034|5.722|6.409|  6.5|6.684|
|     STEUBEN|6.846|6.679|6.615|6.878|6.292|6.176|6.308|6.094|
|       YATES| null| null|5.333| null| null| null| null| null|
+------------+-----+-----+-----+-----+-----+-----+-----+-----+
only showing top 5 rows



f. For each year, display the average of gender count. 


In [None]:
df.groupBy("year").pivot("sex").agg(sum("count")).withColumn("TOTAL", expr("F+M"))\
  .withColumn('F', F.round((F.col('F') / col("TOTAL")) * 100, 2)) \
  .withColumn('F',F.concat(F.col('F').cast(StringType()),F.lit('%')))\
  .withColumn('M', F.round((F.col('M') / col("TOTAL")) * 100, 2)) \
  .withColumn('M',F.concat(F.col('M').cast(StringType()),F.lit('%')))\
  .orderBy("year").select("year", "F", "M").show()

+----+------+------+
|year|     F|     M|
+----+------+------+
|2007|42.13%|57.87%|
|2008| 41.5%| 58.5%|
|2009|41.64%|58.36%|
|2010|41.81%|58.19%|
|2011|41.97%|58.03%|
|2012|41.69%|58.31%|
|2013|42.21%|57.79%|
|2014|44.14%|55.86%|
+----+------+------+



In [None]:
import pyspark.pandas as ps



In [None]:
df1 = ps.read_csv("baby_names.csv")



In [None]:
df1.columns = ['year', 'first_name', 'county', 'sex', 'count']

In [None]:
df1.head(10)

Unnamed: 0,year,first_name,county,sex,count
0,2013,GAVIN,ST LAWRENCE,M,9
1,2013,LEVI,ST LAWRENCE,M,9
2,2013,LOGAN,NEW YORK,M,44
3,2013,HUDSON,NEW YORK,M,49
4,2013,GABRIEL,NEW YORK,M,50
5,2013,THEODORE,NEW YORK,M,51
6,2013,ELIZA,KINGS,F,16
7,2013,MADELEINE,KINGS,F,16
8,2013,ZARA,KINGS,F,16
9,2013,DAISY,KINGS,F,16


5. Create two new boolean columns “Sex_M” and “Sex_F”, represent the dummy version of “Sex” column. 

In [None]:
df1 = ps.get_dummies(data = df1,columns = ['sex'])

In [None]:
df1.head(10)

Unnamed: 0,year,first_name,county,count,sex_F,sex_M
0,2013,GAVIN,ST LAWRENCE,9,0,1
1,2013,LEVI,ST LAWRENCE,9,0,1
2,2013,LOGAN,NEW YORK,44,0,1
3,2013,HUDSON,NEW YORK,49,0,1
4,2013,GABRIEL,NEW YORK,50,0,1
5,2013,THEODORE,NEW YORK,51,0,1
6,2013,ELIZA,KINGS,16,1,0
7,2013,MADELEINE,KINGS,16,1,0
8,2013,ZARA,KINGS,16,1,0
9,2013,DAISY,KINGS,16,1,0


**Sex column has already been removed when using the "get_dummies" function**

In [None]:
rdd1 = df1.to_spark()



In [None]:
rdd1.printSchema()

root
 |-- year: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- county: string (nullable = true)
 |-- count: integer (nullable = true)
 |-- sex_F: byte (nullable = true)
 |-- sex_M: byte (nullable = true)



In [None]:
for line in rdd1.take(3):
  print(line)

Row(year=2013, first_name='GAVIN', county='ST LAWRENCE', count=9, sex_F=0, sex_M=1)
Row(year=2013, first_name='LEVI', county='ST LAWRENCE', count=9, sex_F=0, sex_M=1)
Row(year=2013, first_name='LOGAN', county='NEW YORK', count=44, sex_F=0, sex_M=1)


In [None]:
df1 = rdd1.toDF("year", "first_name", "county", "count", "sex_F", "sex_M")

In [None]:
df1.show(10)

+----+----------+-----------+-----+-----+-----+
|year|first_name|     county|count|sex_F|sex_M|
+----+----------+-----------+-----+-----+-----+
|2013|     GAVIN|ST LAWRENCE|    9|    0|    1|
|2013|      LEVI|ST LAWRENCE|    9|    0|    1|
|2013|     LOGAN|   NEW YORK|   44|    0|    1|
|2013|    HUDSON|   NEW YORK|   49|    0|    1|
|2013|   GABRIEL|   NEW YORK|   50|    0|    1|
|2013|  THEODORE|   NEW YORK|   51|    0|    1|
|2013|     ELIZA|      KINGS|   16|    1|    0|
|2013| MADELEINE|      KINGS|   16|    1|    0|
|2013|      ZARA|      KINGS|   16|    1|    0|
|2013|     DAISY|      KINGS|   16|    1|    0|
+----+----------+-----------+-----+-----+-----+
only showing top 10 rows



In [None]:
df1.printSchema()

root
 |-- year: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- county: string (nullable = true)
 |-- count: integer (nullable = true)
 |-- sex_F: byte (nullable = true)
 |-- sex_M: byte (nullable = true)



In [None]:
df1.rollup("sex_F").agg(sum("count").alias("total_babies_per_sex"))\
  .withColumn('perc_total_babies_per_sex', F.round((F.col('total_babies_per_sex') / total_babies ) * 100, 2)) \
  .withColumn('perc_total_babies_per_sex',F.concat(F.col('perc_total_babies_per_sex').cast(StringType()),F.lit('%')))\
  .show()

+-----+--------------------+-------------------------+
|sex_F|total_babies_per_sex|perc_total_babies_per_sex|
+-----+--------------------+-------------------------+
|    1|              397693|                   42.15%|
|    0|              545735|                   57.85%|
| null|              943428|                   100.0%|
+-----+--------------------+-------------------------+



# **Hadoop**

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!wget https://downloads.apache.org/hadoop/common/hadoop-3.3.2/hadoop-3.3.2.tar.gz
!tar -xzvf hadoop-3.3.2.tar.gz
!cp -r hadoop-3.3.2/ /usr/local/
!readlink -f /usr/bin/java | sed "s:bin/java::"

To set java path, go to /usr/local/hadoop-3.3.2/etc/hadoop/hadoop-env.sh then

. . . export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64/ . . .

In [None]:
%%file omg_mapper.py
#!/usr/bin/env python
import sys
import re

patterns = ["omg", "oh-my-god", "oh my god"]
# We will convert each line to lower case

for line in sys.stdin:
  try:
    # format of each line - Ross: Hey there!
    line = line.strip().lower() # ross: hey there!
    sentence = line.split(":")[1]
    for pattern in patterns:
      if re.search(pattern, sentence):
        speaker = line.split(":")[0].title() # Ross
        print('%s\t%s' % (speaker, 1))
  except:
    continue

Overwriting omg_mapper.py


In [None]:
%%file omg_reducer.py
#!/usr/bin/env python
import sys

current_speaker = None
current_count = 0
speaker = None

for line in sys.stdin:
    line = line.strip()
    try:
        speaker, count = line.split('\t', 1)
        count = int(count)
    except ValueError:
        continue
        
    if current_speaker == speaker:
        current_count += count
    else:
        if current_speaker:
            print('%s\t%s' % (current_speaker, current_count))
        current_count = count
        current_speaker = speaker

if current_speaker == speaker:
    print('%s\t%s' % (current_speaker, current_count))

Overwriting omg_reducer.py


In [None]:
!chmod +x omg_reducer.py 
!chmod +x omg_mapper.py 

In [None]:
%%file Makefile

HADOOP_VERSION=3.3.2
HADOOP_HOME=/export/hadoop-${HADOOP_VERSION}
HADOOP_TOOLS=usr/local/hadoop-3.3.2/share/hadoop/tools/lib
HADOOP_JAR = "/content/hadoop-3.3.2/share/hadoop/tools/lib/hadoop-streaming-3.3.2.jar"
HDFS_DIR=/content/${USER}
 
SAMPLES = ./drive/MyDrive/FriendsEpisodes/*.txt

copy_to_hdfs: ${SAMPLES}
	/usr/local/hadoop-3.3.2/bin/hdfs dfs -mkdir -p ${HDFS_DIR}/friends
	/usr/local/hadoop-3.3.2/bin/hdfs dfs -put $^ ${HDFS_DIR}/friends

run_with_hadoop: 
	/usr/local/hadoop-3.3.2/bin/hadoop jar ${HADOOP_JAR} \
    -file  ${PWD}/omg_mapper.py  -mapper  ${PWD}/omg_mapper.py \
    -file  ${PWD}/omg_reducer.py -reducer ${PWD}/omg_reducer.py \
    -input ${HDFS_DIR}/friends/*.txt -output ${HDFS_DIR}/omg_distribution

Overwriting Makefile


In [None]:
%%bash
/usr/local/hadoop-3.3.2/bin/hdfs dfs -rm -r friends  # remove input directory

In [None]:
%%bash
make copy_to_hdfs # copy sample files to hdfs

/usr/local/hadoop-3.3.2/bin/hdfs dfs -mkdir -p /content//friends
/usr/local/hadoop-3.3.2/bin/hdfs dfs -put drive/MyDrive/FriendsEpisodes/1008.txt drive/MyDrive/FriendsEpisodes/314.txt drive/MyDrive/FriendsEpisodes/607.txt drive/MyDrive/FriendsEpisodes/214.txt drive/MyDrive/FriendsEpisodes/605.txt drive/MyDrive/FriendsEpisodes/114.txt drive/MyDrive/FriendsEpisodes/407.txt drive/MyDrive/FriendsEpisodes/602.txt drive/MyDrive/FriendsEpisodes/307.txt drive/MyDrive/FriendsEpisodes/207.txt drive/MyDrive/FriendsEpisodes/619.txt drive/MyDrive/FriendsEpisodes/518.txt drive/MyDrive/FriendsEpisodes/1002.txt drive/MyDrive/FriendsEpisodes/418.txt drive/MyDrive/FriendsEpisodes/319.txt drive/MyDrive/FriendsEpisodes/219.txt drive/MyDrive/FriendsEpisodes/118.txt drive/MyDrive/FriendsEpisodes/1007.txt drive/MyDrive/FriendsEpisodes/423.txt drive/MyDrive/FriendsEpisodes/911.txt drive/MyDrive/FriendsEpisodes/918.txt drive/MyDrive/FriendsEpisodes/323.txt drive/MyDrive/FriendsEpisodes/1013.txt drive/MyDrive/F

In [None]:
%%bash
/usr/local/hadoop-3.3.2/bin/hdfs dfs -ls friends # list files on hdfs

In [None]:
%%bash
/usr/local/hadoop-3.3.2/bin/hdfs dfs -rm -r -f omg_distribution # Remove output directory on hdfs

Deleted omg_distribution


2022-05-25 19:36:20,178 INFO Configuration.deprecation: io.bytes.per.checksum is deprecated. Instead, use dfs.bytes-per-checksum


In [None]:
%%bash
make run_with_hadoop  # Run the hadoop streaming

In [None]:
import operator
dist = {}
with open("omg_distribution/part-00000", "r") as file:
  for line in file:
    speaker, count = line.split('\t')
    dist[speaker] = int(count)
results = reversed(sorted(dist.items(), key=operator.itemgetter(1)))
results = list(results)
print(results)

[('Rachel', 203), ('Monica', 181), ('Phoebe', 152), ('Ross', 90), ('Chandler', 67), ('Joey', 52), ('Emily', 6), ('All', 5), ('Rach', 4), ('Mona', 4), ('Janice', 4), ('Mr. Geller', 3), ('Jill', 3), ('Mrs. Potter', 2), ('Mrs. Geller', 2), ('Kathy', 2), ('Julie', 2), ('Joshua', 2), ('Frank', 2), ('Dennis Phillips', 2), ('Charlie', 2), ('Carol', 2), ('Amy', 2), ('Woman No. 1', 1), ('Woman', 1), ('Tim', 1), ('The Cooking Teacher', 1), ('Tag', 1), ('Store Guy', 1), ('Sarah', 1), ('Phoebe And Joey', 1), ('Nina', 1), ('Mrs. Waltham', 1), ('Mrs Green', 1), ('Mrs Buffay', 1), ('Monica And Phoebe', 1), ('Mnca', 1), ('Mindy', 1), ('Mike (To The Charity Guy)', 1), ('Melissa', 1), ('Luisa', 1), ('Lowell', 1), ('Lizzie', 1), ('Lisa', 1), ('Laura', 1), ('Kim', 1), ('Jessica Ashley', 1), ('Guest #2', 1), ('Frank Sr.', 1), ('Everybody', 1), ('Eric', 1), ('Elizabeth', 1), ('Dina', 1), ('Cynthia', 1), ('Cliff', 1), ('Chan', 1), ('Casting Director #1', 1), ('Cassie', 1), ('Caitlin', 1), ('Aurora', 1), ('Al

In [None]:
with open('omg_distribution.txt', 'w') as file:
  for result in results:
    file.write("%s\t%s\n" % (result[0], result[1]))