In [1]:
import os
import sys
import numpy as np
import h5py
from pyspark import SparkContext, SparkConf
import time
import math
import subprocess
from operator import add

In [2]:
from pyspark.sql import SparkSession

spark_session = SparkSession\
.builder\
.master("spark://192.168.2.92:7077")\
.appName("test_h5_2")\
.config("spark.dynamicAllocation.enabled", False)\
.config("spark.shuffle.service.enabled", False)\
.config("spark.dynamicAllocation.executorIdleTimeout","30s")\
.config("spark.executor.cores",2)\
.config("spark.driver.host", "192.168.2.92")\
.getOrCreate()\

#        .config("spark.executor.memory",471859200)\
# Old API (RDD)
spark_context = spark_session.sparkContext

In [3]:
spark_context

In [4]:
# Creation of the list of decades we want, each decade (line) has a name, a starting year and an end year
list_decades=[["1920's",1920,1929],
              ["1930's",1930,1939],
              ["1940's",1940,1949],
              ["1950's",1950,1959],
              ["1960's",1960,1969],
              ["1970's",1970,1979],
              ["1980's",1980,1989],
              ["1990's",1990,1999],
              ["2000's",2000,2009],
             ["2010's",2010,2019]]

# Function to get the decade, returns the artist name and the decade
def get_decade(line):
    year=int(re.split(',', line)[17])
    for decade in list_decades:
        if year >= decade[1] and year <= decade[2]:
            return (re.split(',', line)[8],decade[0])

In [5]:
#FOR ALL THE DATA PRESENT IN SINGLE CSV FILE
import re
start_time=time.time()
rdd_one_csv = spark_context.textFile('hdfs://sai-master:9820/SongCSV.csv')
header = rdd_one_csv.first()
rdd_csv = rdd_one_csv.filter(lambda line : line != header)
rdd_csv_filtered = rdd_csv.filter(lambda row: int(re.split(',', row)[17]) != 0)
rdd_csv_decade=rdd_csv_filtered.map(get_decade)
rdd_csv_mapped=rdd_csv_decade.map(lambda row: (row,1))
rdd_csv_reduced=rdd_csv_mapped.reduceByKey(add)
list_top3=[]
for decade in list_decades:
    top3=rdd_csv_reduced.filter(lambda row: row[0][1] == decade[0]).takeOrdered(3, key=lambda x: -x[1])
    for artist in top3:
        list_top3.append((artist[0][1],artist[0][0],artist[1]))

# Visualisation of the output
# Feel free to change it if you find another way to represent the output
for decade in list_decades:
    print("####################################")
    print("Top 3 artist of ",decade[0],":")
    for artist in list_top3:
        l_a_d=[]
        if artist[0]==decade[0]:
            l_a_d.append([artist[1],artist[2]])
        for a in l_a_d:
            print("- ",a[0],"\t",a[1])


total_time=time.time()-start_time
print('Time taken for the execution: {}m {}s'.format(
        math.floor(total_time / 60),
        math.floor(total_time % 60),
    ))

####################################
Top 3 artist of  1920's :
-  "b'Blind Lemon Jefferson'" 	 3
-  "b'Blind Willie McTell'" 	 1
-  "b'Charley Patton'" 	 1
####################################
Top 3 artist of  1930's :
-  "b'Sleepy John Estes'" 	 3
-  "b'Charley Patton'" 	 1
-  "b'Red Foley'" 	 1
####################################
Top 3 artist of  1940's :
-  "b'Tex Williams'" 	 2
-  "b'Bukka White'" 	 2
####################################
Top 3 artist of  1950's :
-  "b'Jackie Wilson'" 	 5
-  "b'Georges Brassens'" 	 3
-  "b'Don Gibson'" 	 2
####################################
Top 3 artist of  1960's :
-  "b'Small Faces'" 	 5
-  "b'Jacques Dutronc'" 	 5
-  "b'The Box Tops'" 	 4
####################################
Top 3 artist of  1970's :
-  "b'Bad Company'" 	 6
-  "b'Michael Jackson'" 	 5
-  "b'The New York Dolls'" 	 4
####################################
Top 3 artist of  1980's :
-  "b'RUN-DMC'" 	 7
-  "b'Michael Stanley Band'" 	 6
-  "b'Thomas Dolby'" 	 5
######################

In [6]:
spark_context.stop()

# ANALYSIS WITH MULTIPLE CSV FILES

In [7]:
from pyspark.sql import SparkSession

spark_session = SparkSession\
.builder\
.master("spark://192.168.2.92:7077")\
.appName("test_h5_2")\
.config("spark.dynamicAllocation.enabled", False)\
.config("spark.shuffle.service.enabled", False)\
.config("spark.dynamicAllocation.executorIdleTimeout","30s")\
.config("spark.executor.cores",2)\
.config("spark.driver.host", "192.168.2.92")\
.getOrCreate()\

#        .config("spark.executor.memory",471859200)\
# Old API (RDD)
spark_context = spark_session.sparkContext
spark_context

In [11]:
rdd_text = spark_context.textFile("hdfs://192.168.2.92:9820/csv_dataset/*.csv")

In [12]:
# Creation of the list of decades we want, each decade (line) has a name, a starting year and an end year
list_decades=[["1920's",1920,1929],
              ["1930's",1930,1939],
              ["1940's",1940,1949],
              ["1950's",1950,1959],
              ["1960's",1960,1969],
              ["1970's",1970,1979],
              ["1980's",1980,1989],
              ["1990's",1990,1999],
              ["2000's",2000,2009],
             ["2010's",2010,2019]]

# Function to get the decade, returns the artist name and the decade
def get_decade(line):
    year=int(re.split(',', line)[17])
    for decade in list_decades:
        if year >= decade[1] and year <= decade[2]:
            return (re.split(',', line)[8],decade[0])

In [13]:
import re
start_time=time.time()
rdd_text = spark_context.textFile("hdfs://192.168.2.92:9820/csv_dataset/*.csv")
#rdd_one_csv = spark_context.textFile('hdfs://sai-master:9820/SongCSV.csv')
rdd_text_filtered = rdd_text.filter(lambda row: int(re.split(',', row)[17]) != 0)
rdd_text_decade= rdd_text_filtered.map(get_decade)
rdd_text_mapped= rdd_text_decade.map(lambda row: (row,1))
rdd_text_reduced= rdd_text_mapped.reduceByKey(add)
list_top3=[]
for decade in list_decades:
    top3=rdd_text_reduced.filter(lambda row: row[0][1] == decade[0]).takeOrdered(3, key=lambda x: -x[1])
    for artist in top3:
        list_top3.append((artist[0][1],artist[0][0],artist[1]))

# Visualisation of the output
# Feel free to change it if you find another way to represent the output
for decade in list_decades:
    print("####################################")
    print("Top 3 artist of ",decade[0],":")
    for artist in list_top3:
        l_a_d=[]
        if artist[0]==decade[0]:
            l_a_d.append([artist[1],artist[2]])
        for a in l_a_d:
            print("- ",a[0],"\t",a[1])


total_time=time.time()-start_time
print('Time taken for the execution: {}m {}s'.format(
        math.floor(total_time / 60),
        math.floor(total_time % 60),
    ))

####################################
Top 3 artist of  1920's :
-  "b'Blind Lemon Jefferson'" 	 3
-  "b'Ma Rainey'" 	 1
-  "b'Charley Patton'" 	 1
####################################
Top 3 artist of  1930's :
-  "b'Sleepy John Estes'" 	 3
-  "b'Charley Patton'" 	 1
-  "b'Red Foley'" 	 1
####################################
Top 3 artist of  1940's :
-  "b'Tex Williams'" 	 2
-  "b'Bukka White'" 	 2
####################################
Top 3 artist of  1950's :
-  "b'Jackie Wilson'" 	 5
-  "b'Georges Brassens'" 	 3
-  "b'Don Gibson'" 	 2
####################################
Top 3 artist of  1960's :
-  "b'Small Faces'" 	 5
-  "b'Jacques Dutronc'" 	 5
-  "b'The Shangri-Las'" 	 4
####################################
Top 3 artist of  1970's :
-  "b'Bad Company'" 	 6
-  "b'Michael Jackson'" 	 5
-  "b'Hot Tuna'" 	 4
####################################
Top 3 artist of  1980's :
-  "b'RUN-DMC'" 	 7
-  "b'Michael Stanley Band'" 	 6
-  "b'Black Flag'" 	 5
####################################
Top 

In [None]:
import re
start_time=time.time()
rdd_text = spark_context.textFile("hdfs://192.168.2.92:9820/csv_dataset/*.csv")
#rdd_one_csv = spark_context.textFile('hdfs://sai-master:9820/SongCSV.csv')
rdd_text_filtered = rdd_text.filter(lambda row: int(re.split(',', row)[17]) != 0)
rdd_text_decade= rdd_text_filtered.map(get_decade)
rdd_text_mapped= rdd_text_decade.map(lambda row: (row,1))
rdd_text_reduced= rdd_text_mapped.reduceByKey(add)
list_top3=[]
count = 0
for decade in list_decades:
    if (count <= 2):
        count = count + 1
        top3=rdd_text_reduced.filter(lambda row: row[0][1] == decade[0]).takeOrdered(3, key=lambda x: -x[1])
        for artist in top3:
            list_top3.append((artist[0][1],artist[0][0],artist[1]))
    else:
        break

# Visualisation of the output
# Feel free to change it if you find another way to represent the output
for decade in list_decades:
    print("####################################")
    print("Top 3 artist of ",decade[0],":")
    for artist in list_top3:
        l_a_d=[]
        if artist[0]==decade[0]:
            l_a_d.append([artist[1],artist[2]])
        for a in l_a_d:
            print("- ",a[0],"\t",a[1])


total_time=time.time()-start_time
print('Time taken for the execution: {}m {}s'.format(
        math.floor(total_time / 60),
        math.floor(total_time % 60),
    ))

In [14]:
spark_context.stop()