## Ensure Spark is running

In [4]:
print(sc.version)


3.2.4


In [5]:
# Check the master URL of the SparkContext
print(sc.master)

local[*]


# Previous steps (Terminal commands)

## A. Run Hadoop using an extended hdfs script created by me using Muhammed's script as a base

./hdfs_extended


## B. Create a new folder in Hadoop File Sytem called "CA1"

hadoop fs -mkdir /CA1


## C. Copy my dataset with TAB separators instead of commas to the folder I've just created

hadoop fs -put /media/sf_CA1/lyrics-dataTAB.csv /CA1


## D. Ensure it is there

hadoop fs -ls /CA1


## E. Run a script I created to replace all double quote characters to single quote characters and create a new file called "lyrics-dataTAB_fixed.csv"

Code for the script:

#!/bin/bash

#Define your HDFS file paths
input_path="/CA1/lyrics-dataTAB.csv"
output_path="/CA1/lyrics-dataTAB_fixed.csv"

#Check if the output file already exists and remove it to avoid errors

hdfs dfs -test -e $output_path && hdfs dfs -rm $output_path

#Use hdfs dfs -cat to output the file content, then sed to replace " with ' and then use hdfs dfs -put to store the result back into HDFS

hdfs dfs -cat $input_path | sed "s/\"/'/g" | hdfs dfs -put - $output_path


Terminal code to tun it:

./scriptToReplaceDoubleQuotesWithSingleQuotes

# DATA-PREPROCESSING with Spark over file in Hadoop File System

# 01. Load csv with tab separator from hadoop filesystem

In [6]:
# Read a CSV file into a DataFrame
df = spark.read.csv("hdfs:///CA1/lyrics-dataTAB_fixed.csv", sep='\t', header=True, multiLine = True, escape = "\n")

df.show(20)

# Count the number of rows
row_count = df.count()

# Show the row count
print(f"The total number of rows is: {row_count}")

                                                                                

+---------------+--------------------+--------------------+--------------------+--------+
|          ALink|               SName|               SLink|               Lyric|language|
+---------------+--------------------+--------------------+--------------------+--------+
|/ivete-sangalo/|               Arerê|/ivete-sangalo/ar...|'Tudo o que eu qu...|      pt|
|/ivete-sangalo/|Se Eu Não Te Amas...|/ivete-sangalo/se...|'Meu coração\nSem...|      pt|
|/ivete-sangalo/|         Céu da Boca|/ivete-sangalo/ch...|'É de babaixá!\nÉ...|      pt|
|/ivete-sangalo/|Quando A Chuva Pa...|/ivete-sangalo/qu...|'Quando a chuva p...|      pt|
|/ivete-sangalo/|        Sorte Grande|/ivete-sangalo/so...|'A minha sorte gr...|      pt|
|/ivete-sangalo/|    A Lua Q Eu T Dei|/ivete-sangalo/a-...|'Posso te falar d...|      pt|
|/ivete-sangalo/|Mulheres Não Têm ...|/ivete-sangalo/mu...|'Hey, girl\nLevan...|      pt|
|/ivete-sangalo/|Eva / Alô Paixão ...|/ivete-sangalo/ev...|'''EVA''\n(Gianca...|      pt|
|/ivete-sa

[Stage 2:>                                                          (0 + 1) / 1]

The total number of rows is: 389622


                                                                                

# 02. Drop rows that contain some null value

In [7]:
# Drop rows that have any null values
df = df.dropna()

# Count the number of rows
row_count = df.count()

# Show the row count
print(f"The total number of rows is: {row_count}")

[Stage 5:>                                                          (0 + 1) / 1]

The total number of rows is: 364924


                                                                                

# 03. Drop duplicates

In [8]:
# To drop identical rows across all columns
df_no_identical = df.distinct()

# Count the number of rows
row_count = df.count()

# Show the row count
print(f"The total number of rows is: {row_count}")

[Stage 8:>                                                          (0 + 1) / 1]

The total number of rows is: 364924


                                                                                

# 04. Replace newline codes (\n) with spaces in Lyric column

In [9]:
from pyspark.sql.functions import regexp_replace

# Replace newline characters with a space in the "lyric" column
df = df.withColumn("lyric", regexp_replace("lyric", "\n", " "))

# Count the number of rows
row_count = df.count()

# Show the row count
print(f"The total number of rows is: {row_count}")

[Stage 11:>                                                         (0 + 1) / 1]

The total number of rows is: 364924


                                                                                

# 05. Drop rows with language codes with more than 2 characters

In [10]:
from pyspark.sql.functions import length


# Filter rows to keep only those where "language" column has <= 2 characters
df = df.filter(length(df.language) <= 2)

# Show the filtered DataFrame (optional)
df.show()

# Count the number of rows
row_count = df.count()

# Show the row count
print(f"The number of rows is: {row_count}")

+---------------+--------------------+--------------------+--------------------+--------+
|          ALink|               SName|               SLink|               lyric|language|
+---------------+--------------------+--------------------+--------------------+--------+
|/ivete-sangalo/|               Arerê|/ivete-sangalo/ar...|'Tudo o que eu qu...|      pt|
|/ivete-sangalo/|Se Eu Não Te Amas...|/ivete-sangalo/se...|'Meu coração Sem ...|      pt|
|/ivete-sangalo/|         Céu da Boca|/ivete-sangalo/ch...|'É de babaixá! É ...|      pt|
|/ivete-sangalo/|Quando A Chuva Pa...|/ivete-sangalo/qu...|'Quando a chuva p...|      pt|
|/ivete-sangalo/|        Sorte Grande|/ivete-sangalo/so...|'A minha sorte gr...|      pt|
|/ivete-sangalo/|    A Lua Q Eu T Dei|/ivete-sangalo/a-...|'Posso te falar d...|      pt|
|/ivete-sangalo/|Mulheres Não Têm ...|/ivete-sangalo/mu...|'Hey, girl Levant...|      pt|
|/ivete-sangalo/|Eva / Alô Paixão ...|/ivete-sangalo/ev...|'''EVA'' (Giancar...|      pt|
|/ivete-sa

[Stage 15:>                                                         (0 + 1) / 1]

The number of rows is: 364922


                                                                                

# 06. Add a new column to the DataFrame with unique row identifiers (IDs)

In [12]:
from pyspark.sql.functions import monotonically_increasing_id

# Add a new column 'id' with unique row IDs
df = df.withColumn("id", monotonically_increasing_id())

# Show the result
df.show()

+---------------+--------------------+--------------------+--------------------+--------+---+
|          ALink|               SName|               SLink|               lyric|language| id|
+---------------+--------------------+--------------------+--------------------+--------+---+
|/ivete-sangalo/|               Arerê|/ivete-sangalo/ar...|'Tudo o que eu qu...|      pt|  0|
|/ivete-sangalo/|Se Eu Não Te Amas...|/ivete-sangalo/se...|'Meu coração Sem ...|      pt|  1|
|/ivete-sangalo/|         Céu da Boca|/ivete-sangalo/ch...|'É de babaixá! É ...|      pt|  2|
|/ivete-sangalo/|Quando A Chuva Pa...|/ivete-sangalo/qu...|'Quando a chuva p...|      pt|  3|
|/ivete-sangalo/|        Sorte Grande|/ivete-sangalo/so...|'A minha sorte gr...|      pt|  4|
|/ivete-sangalo/|    A Lua Q Eu T Dei|/ivete-sangalo/a-...|'Posso te falar d...|      pt|  5|
|/ivete-sangalo/|Mulheres Não Têm ...|/ivete-sangalo/mu...|'Hey, girl Levant...|      pt|  6|
|/ivete-sangalo/|Eva / Alô Paixão ...|/ivete-sangalo/ev...|'

# 07. Save "Lyrics-dataForHbase.csv" with tab separator to hadoop filesystem

In [13]:
# Write the limited DataFrame to a new CSV file
df.write.option("header", "true").option("sep", "\t").csv("hdfs:///CA1/lyrics-dataForHbase.csv")


                                                                                

# 08. Import preprocessed file from Hadoop into HBase (Terminal commands)

## 8.1 Ensure "lyrics-dataForHbase.csv" is in hadoop filesystem

hadoop fs -ls /CA1

## 8.2 Run hbase shell

/usr/local/hbase/bin/hbase shell

## 8.3 Create "Lyrics" table in hbase shell

create 'LyricsTEST', 'cf'

## 8.4 Import "lyrics-dataForHbase.csv" from hadoop filesystem to HBase

bin/hbase org.apache.hadoop.hbase.mapreduce.ImportTsv \
-Dimporttsv.columns=cf:ALink,cf:SName,cf:SLink,cf:Lyric,cf:language,HBASE_ROW_KEY \
-Dmapreduce.input.fileinputformat.input.charset=UTF-8 \
Lyrics hdfs:///CA1/lyrics-dataForHbase.csv

## 8.5 Run Thrift server (needed for Happy Base)

/usr/local/hbase/bin/hbase thrift start

# 09. Access HBase from notebook using HappyBase python library

In [100]:
import happybase
import pandas as pd

# Connect to HBase
connection = happybase.Connection('localhost')
table = connection.table('lyricsTEST')

# Extract all rows from HBase into a list
rows = [data for _, data in table.scan()]

# Construct DataFrame
df = pd.DataFrame(rows)

# Convert bytes columns to string
for column in df.columns:
    df[column] = df[column].str.decode('utf-8')

df.head(50)


IOError: IOError(message=b'org.apache.hadoop.hbase.TableNotFoundException: lyricsTEST\n\tat org.apache.hadoop.hbase.client.ConnectionImplementation.locateRegionInMeta(ConnectionImplementation.java:989)\n\tat org.apache.hadoop.hbase.client.ConnectionImplementation.locateRegion(ConnectionImplementation.java:866)\n\tat org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.getRegionLocations(RpcRetryingCallerWithReadReplicas.java:319)\n\tat org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:157)\n\tat org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:53)\n\tat org.apache.hadoop.hbase.client.RpcRetryingCallerImpl.callWithoutRetries(RpcRetryingCallerImpl.java:187)\n\tat org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:266)\n\tat org.apache.hadoop.hbase.client.ClientScanner.loadCache(ClientScanner.java:435)\n\tat org.apache.hadoop.hbase.client.ClientScanner.nextWithSyncCache(ClientScanner.java:309)\n\tat org.apache.hadoop.hbase.client.ClientScanner.next(ClientScanner.java:594)\n\tat org.apache.hadoop.hbase.client.ResultScanner.next(ResultScanner.java:94)\n\tat org.apache.hadoop.hbase.thrift.ThriftHBaseServiceHandler.scannerGetList(ThriftHBaseServiceHandler.java:814)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:498)\n\tat org.apache.hadoop.hbase.thrift.HbaseHandlerMetricsProxy.invoke(HbaseHandlerMetricsProxy.java:63)\n\tat com.sun.proxy.$Proxy10.scannerGetList(Unknown Source)\n\tat org.apache.hadoop.hbase.thrift.generated.Hbase$Processor$scannerGetList.getResult(Hbase.java:5093)\n\tat org.apache.hadoop.hbase.thrift.generated.Hbase$Processor$scannerGetList.getResult(Hbase.java:5072)\n\tat org.apache.thrift.ProcessFunction.process(ProcessFunction.java:38)\n\tat org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:38)\n\tat org.apache.hadoop.hbase.thrift.TBoundedThreadPoolServer$ClientConnnection.run(TBoundedThreadPoolServer.java:279)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:750)\n')

## ******************************************************************************************************

## ******************************************************************************************************

## Appendix 01. Save a reduced dataset to work with before doing the big importing to hbase

In [70]:
# Limit the DataFrame to the first 20 rows
df_limit = df.limit(20)

# Write the limited DataFrame to a new CSV file
#df_limit.write.option("header", "true").option("sep", "\t").csv("hdfs:///CA1/lyrics-dataQUO20tab.csv")


## Appendix 02. Find unique values for column "Language" and number of songs for each

In [90]:
from pyspark.sql.functions import col, lit, concat_ws

# Define the language mapping data
language_mapping_data = [
    ("en", "English"),
    ("vi", "Vietnamese"),
    ("ro", "Romanian"),
    ("lv", "Latvian"),
    ("pl", "Polish"),
    ("st", "Sotho"),
    ("pt", "Portuguese"),
    ("gl", "Galician"),
    ("tl", "Tagalog"),
    ("sw", "Swahili"),
    ("ko", "Korean"),
    ("ms", "Malay"),
    ("cs", "Czech"),
    ("mg", "Malagasy"),
    ("sr", "Serbian"),
    ("tr", "Turkish"),
    ("de", "German"),
    ("is", "Icelandic"),
    ("es", "Spanish"),
    ("hr", "Croatian"),
    ("eu", "Basque"),
    ("lg", "Ganda"),
    ("it", "Italian"),
    ("af", "Afrikaans"),
    ("ku", "Kurdish"),
    ("su", "Sundanese"),
    ("ar", "Arabic"),
    ("sv", "Swedish"),
    ("nl", "Dutch"),
    ("rw", "Kinyarwanda"),
    ("hu", "Hungarian"),
    ("ca", "Catalan"),
    ("ru", "Russian"),
    ("iw", "Hebrew"),
    ("ga", "Irish"),
    ("ht", "Haitian Creole"),
    ("no", "Norwegian"),
    ("fa", "Persian"),
    ("cy", "Welsh"),
    ("et", "Estonian"),
    ("zh", "Chinese"),
    ("fr", "French"),
    ("ja", "Japanese"),
    ("gd", "Scottish Gaelic"),
    ("id", "Indonesian"),
    ("ny", "Chichewa"),
    ("da", "Danish"),
    ("sq", "Albanian"),
    ("fi", "Finnish"),
    ("jw", "Javanese"),
    # Add more mappings here
]

# Create a DataFrame from the language mapping data
language_mapping_df = spark.createDataFrame(language_mapping_data, ["code", "language_name"])

# Alias the DataFrames to avoid column name ambiguities
df_alias = df.alias("df")
language_mapping_df_alias = language_mapping_df.alias("lm")

# Join the DataFrames on the language code
df_with_language_names = df_alias.join(language_mapping_df_alias, col("df.language") == col("lm.code"))

# Select the necessary columns using the correct DataFrame alias
df_with_language_names = df_with_language_names.select(
    col("df.ALink"),
    col("df.SName"),
    col("df.SLink"),
    col("df.lyric"),
    col("lm.code").alias("language_code"),
    col("lm.language_name")
)

# Count the number of songs per language, and order by this count
language_song_counts = (df_with_language_names
                        .groupBy("language_code", "language_name")
                        .count()
                        .orderBy(col("count").desc()))

# Show the result, concatenating the strings to match the required format
language_song_counts.select(
    concat_ws(" ", 
              col("language_name"), 
              concat_ws("", lit("("), col("language_code"), lit(")")), 
              concat_ws("", col("count"), lit(" songs"))
    ).alias("result")
).show(100, truncate=False)

[Stage 210:>                                                        (0 + 1) / 1]

+----------------------------+
|result                      |
+----------------------------+
|English (en) 191576 songs   |
|Portuguese (pt) 157276 songs|
|Spanish (es) 9905 songs     |
|Kinyarwanda (rw) 1678 songs |
|Italian (it) 1431 songs     |
|French (fr) 1222 songs      |
|German (de) 844 songs       |
|Finnish (fi) 145 songs      |
|Swedish (sv) 112 songs      |
|Romanian (ro) 97 songs      |
|Norwegian (no) 89 songs     |
|Icelandic (is) 86 songs     |
|Tagalog (tl) 69 songs       |
|Polish (pl) 47 songs        |
|Galician (gl) 36 songs      |
|Turkish (tr) 32 songs       |
|Irish (ga) 32 songs         |
|Indonesian (id) 26 songs    |
|Welsh (cy) 23 songs         |
|Afrikaans (af) 19 songs     |
|Swahili (sw) 19 songs       |
|Sundanese (su) 19 songs     |
|Korean (ko) 17 songs        |
|Dutch (nl) 14 songs         |
|Estonian (et) 13 songs      |
|Danish (da) 13 songs        |
|Catalan (ca) 13 songs       |
|Malay (ms) 8 songs          |
|Japanese (ja) 7 songs       |
|Sotho (

                                                                                