In [None]:
# Installing required packages
# !pip install pyspark
# !pip install findspark
# !pip install pandas

In [None]:
import numpy as np
import pandas as pd
import seaborn as sns
from scipy import stats
from matplotlib import pyplot as plt

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, RandomForestClassifier
from sklearn.metrics import accuracy_score

In [None]:
# TASK:

# The age_salary_hours dataset provides key information about individuals’ demographics and
# employment status, including age, yearly income, weekly working hours, and educational background.



# The dataset provided contains 4 columns:

# • Age: Represents the age of individuals.
# • Annual Salary: Indicates the annual salary of individuals.
# • Weekly hours: Denotes the number of weekly working hours of individuals.
# • Education: Represents the education level of individuals.



# Using SparkSQL determine the 5 age groups with the highest average annual salary.

In [None]:
df = pd.read_csv('age_salary_hours.csv')

In [None]:
df.head()

Unnamed: 0,Age,Annual Salary,Weekly hours,Education
0,72,160000.0,40.0,Bachelor's degree or higher
1,72,100000.0,50.0,Bachelor's degree or higher
2,31,120000.0,40.0,Bachelor's degree or higher
3,28,45000.0,40.0,Bachelor's degree or higher
4,54,85000.0,40.0,Bachelor's degree or higher


In [None]:
# Creating a spark context class.
# This should be uncommented when running the first Time!!!
# Or it should be commented when running multiple times.
# sc = SparkContext()

# Creating a spark session.
spark = SparkSession \
    .builder \
    .appName("Music Genre Dataset") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [None]:
# Create the spark dataframe.
sdf = spark.createDataFrame(df)

In [None]:
from pyspark.sql.functions import avg, desc

In [None]:
# Create the spark dataframe.
sdf = spark.createDataFrame(df)

new_columns = [c.replace(' ', '_') for c in sdf.columns]
sdf = sdf.toDF(*new_columns)

In [None]:
sdf.printSchema()

root
 |-- Age: long (nullable = true)
 |-- Annual_Salary: double (nullable = true)
 |-- Weekly_hours: double (nullable = true)
 |-- Education: string (nullable = true)



In [None]:
# Group by genre and calculate the average popularity for each genre.
genre_avg_salary = sdf.groupBy("Age") \
                      .agg(avg("Annual_Salary") \
                      .alias("avg_salary")) \
                      .orderBy(desc("avg_salary"))

genre_avg_salary.show(5)

+---+------------------+
|Age|        avg_salary|
+---+------------------+
| 53|129444.44444444444|
| 37|          120750.0|
| 48|          119000.0|
| 57|117285.71428571429|
| 45|112444.44444444444|
+---+------------------+
only showing top 5 rows



In [None]:
sdf.createOrReplaceTempView("people")

In [None]:
res =  spark.sql('select Age, avg(Annual_Salary) as AVGSalary from people group by Age order by AVGSalary desc limit 5').show()

+---+------------------+
|Age|         AVGSalary|
+---+------------------+
| 53|129444.44444444444|
| 37|          120750.0|
| 48|          119000.0|
| 57|117285.71428571429|
| 45|112444.44444444444|
+---+------------------+



------------------------------------------------------

In [None]:
# Gender classification involves classifying a person as male or female (binary classification). The idea is to
# determine whether certain features are enough to determine the gender of a person. You are required to
# use the DecisionTreeClassifier in the SparkML classification library.



# Few things to note about DecisionTreeClassifier:

# • It requires that the provided columns be named features and label specifically
# • Every column, including the label, must be numeric.

# The dataset you will use has 8 columns in total:
# [ ‘long_hair’, ‘forehead_width_cm’, ‘forehead_height_cm’, ‘nose_wide’, ‘nose_long’, ‘lips_thin’, ‘distance_nose_to_lip_long’, ‘gender’ ]

# • long_hair: this indicates whether this person has a long hair or not
# • forehead_width_cm: the width of the forehead from right to left given in cm
# • forehead_height_cm: the width of the forehead width in cm from where the hair grows to the eyebrows.
# • nose_wide: whether the nose is wide or not. 1 represents wide and 0 not
# • nose_long: whether the nose is long or not. 1 represents long and 0 not.
# • lips_thin: whether this person has a thin lip or not. 1 represents thin and 0 not.
# • distance_nose_to_lip_long: is the distance from nose to lip is long? 1 represents yes and 0 not
# • gender: Either Male or Female



# You will need to do any necessary featurization before applying the DecisionŁreeClassifier
# on the data and calculating the accuracy.

In [None]:
df = pd.read_csv('genders.csv')

In [None]:
df.head(5)

Unnamed: 0,long_hair,forehead_width_cm,forehead_height_cm,nose_wide,nose_long,lips_thin,distance_nose_to_lip_long,gender
0,1,11.8,6.1,1,0,1,1,Male
1,0,14.0,5.4,0,0,1,0,Female
2,0,11.8,6.3,1,1,1,1,Male
3,0,14.4,6.1,0,1,1,1,Male
4,1,13.5,5.9,0,0,0,0,Female


In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [None]:
# Create a Spark session
spark = SparkSession.builder.appName("GenderClassification").getOrCreate()

In [None]:
columns = ['long_hair', 'forehead_width_cm', 'forehead_height_cm', 'nose_wide', 'nose_long', 'lips_thin', 'distance_nose_to_lip_long', 'gender']
df = spark.createDataFrame(df, columns)

In [None]:
# Define the feature columns
feature_cols = ['long_hair', 'forehead_width_cm', 'forehead_height_cm', 'nose_wide', 'nose_long', 'lips_thin', 'distance_nose_to_lip_long']

In [None]:
# Assemble the features into a single column
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
df = assembler.transform(df)

In [None]:
# Convert the gender column to numeric
indexer = StringIndexer(inputCol="gender", outputCol="label")
df = indexer.fit(df).transform(df)

In [None]:
# Split the data into training and test sets
(train_data, test_data) = df.randomSplit([0.8, 0.2], seed=42)

In [None]:
# Create a DecisionTreeClassifier
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features")

In [None]:
# Train the model
model = dt.fit(train_data)

In [None]:
# Make predictions
predictions = model.transform(test_data)

In [None]:
# Evaluate the model
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy:", accuracy)

Accuracy: 0.9625668449197861


In [None]:
# the test_data contains all columns including the gender column (label).
# But it is not used in predicting, as the transform function does this automatically.
# It only uses the other features to make predictions, and it doesnt use the specified label column.

----------------------------

In [None]:
# Income prediction involves predicting an individual’s income based on various features. For this task, you
# are required to utilize the DecisionTreeClassifier from the SparkML classification library.




# Key considerations about DecisionTreeClassifier:

# • It requires that the provided columns be named features and label specifically.
# • All columns, including the label, must be numeric.





# You will need to do any necessary featurization before applying the DecisionTreeClassifieron the data and calculating the accuracy.






# --The dataset provided contains 15 columns:--


# • Age: Represents the age of individuals.

# • workclass: Specifies the type of workclass the individual belongs to.

# • fnlwgt: Denotes the final weight of the individual.

# • Education: Represents the education level of individuals.

# • educational-num: Denotes the educational number assigned to the individual.

# • marital-status: Specifies the marital status of the individual. Where numbers represent the corresponding status sequentially:
            # ’Never-married’, ’Married-civ-spouse’, ’Widowed’, ’Divorced’, ’Separated’, ’Married-spouse-absent’, ’Married-AF-spouse’

# • occupation: Represents the occupation. Where numbers represent the corresponding status sequentially:
            # 'achine-op-inspct’, ’Farming-fishing’, ’Protective-serv’, ’ ?’,
            # ’Other-service’, ’Prof-specialty’, ’Craft-repair’, ’Adm-clerical’,
            # ’Exec-managerial’, ’Tech-support’, ’Sales’, ’Priv-house-serv’,
            # ’Transport-moving’, ’Handlers-cleaners’, ’Armed-Forces’.

# • relationship: Specifies the relationship status Where 3: ’Own-child’, 1: ’Husband’, 1:’Not-in- family’, 4: ’Unmarried’, 5: ’Wife’, and 2: ’Other-relative’.

# • race: Where 3: ’Own-child’, 1: ’Husband’, 1:’Not-in-family’, 4: ’Unmarried’, 5: ’Wife’, and 2: ’Other- relative’.

# • Gender: Where 0: ’Female’ and 1: ’Male’.

# • capital-gain: Represents the capital gain of the individual.

# • capital-loss: Denotes the capital loss of the individual.

# • hours-per-week: Specifies the number of hours per week the individual works.

# • native-country: Represents the native country of the individual.

# • income: Specifies the income level Where 0: ’<=50K’ and 1 : ’>50K’


In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import pandas as pd

# Create a Spark session
spark = SparkSession.builder.appName("IncomePrediction").getOrCreate()

# Load the dataset
df = pd.read_csv('income_for_adult.csv')

# Drop the 'Unnamed: 0' column
df = df.drop('Unnamed: 0', axis=1)

# Rename the columns to match the feature_cols
df = df.rename(columns={
    'age': 'Age',
    'workclass': 'workclass',
    'fnlwgt': 'fnlwgt',
    'education': 'Education',
    'educational-num': 'educational-num',
    'marital-status': 'marital-status',
    'occupation': 'occupation',
    'relationship': 'relationship',
    'race': 'race',
    'gender': 'Gender',
    'capital-gain': 'capital-gain',
    'capital-loss': 'capital-loss',
    'hours-per-week': 'hours-per-week',
    'native-country': 'native-country',
    'income': 'income'
})

# Convert Pandas DataFrame to Spark DataFrame
spark_df = spark.createDataFrame(df)

# Convert categorical columns to numerical indices
indexers = [StringIndexer(inputCol=col, outputCol=col+"_index", handleInvalid='keep').fit(spark_df) for col in ['workclass',
                                                                                                                'Education',
                                                                                                                'marital-status',
                                                                                                                'occupation',
                                                                                                                'relationship',
                                                                                                                'race',
                                                                                                                'Gender',
                                                                                                                'native-country']]
for indexer in indexers:
    spark_df = indexer.transform(spark_df)

# Define the feature columns including the newly created numerical indices
feature_cols = ['Age', 'workclass_index', 'fnlwgt', 'Education_index',
                'educational-num', 'marital-status_index',
                'occupation_index', 'relationship_index',
                'race_index', 'Gender_index', 'capital-gain',
                'capital-loss', 'hours-per-week', 'native-country_index']

# Assemble the features into a single column, skipping rows with NaN values
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features", handleInvalid="skip")
spark_df = assembler.transform(spark_df)

# Convert the label column to numeric
indexer = StringIndexer(inputCol="income", outputCol="label")
spark_df = indexer.fit(spark_df).transform(spark_df)

# Split the data into training and test sets
(train_data, test_data) = spark_df.randomSplit([0.8, 0.2], seed=42)

# Create a DecisionTreeClassifier with increased maxBins
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features", maxBins=50)

# Train the model
model = dt.fit(train_data)

# Make predictions
predictions = model.transform(test_data)

# Evaluate the model
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy:", accuracy)


Accuracy: 0.8503834654881061


----------------------------------------

In [None]:
# Income prediction involves predicting an individual’s income based on various features. For this task,
# you are required to utilize the LogisticRegression from the SparkML classification library.




# Key considerations about LogisticRegression:
# • It requires that the provided columns be named features and label specifically.
# • All columns, including the label, must be numeric.





# Find the average weekly hours worked by individuals grouped by gender






# You will need to do any necessary featurization before applying the LogisticRegression the data and calculating the accuracy.






# The dataset provided contains 15 columns:

# • Age: Represents the age of individuals.

# • workclass: Specifies the type of workclass the individual belongs to.

# • fnlwgt: Denotes the final weight of the individual.

# • Education: Represents the education level of individuals.

# • educational-num: Denotes the educational number assigned to the individual.

# • marital-status: Specifies the marital status of the individual. Where numbers represent the
#   corresponding status sequentially: ’Never-married’, ’Married-civ-spouse’, ’Widowed’, ’Divorced’,
#   ’Separated’, ’Married-spouse-absent’, ’Married-AF-spouse’

# • occupation: Represents the occupation. Where numbers represent the corresponding status sequentially:
#   achine-op-inspct’, ’Farming-fishing’, ’Protective-serv’, ’ ?’, ’Other-service’, ’Profspecialty’,
#   ’Craft-repair’,
#   ’Adm-clerical’,
#   ’Exec-managerial’,
#   ’Tech-support’,
#   ’Transportmoving’, ’Handlers-cleaners’, ’Armed-Forces’.
#   ’Sales’,
#   ’Priv-house-serv’,

# • relationship: Specifies the relationship status Where 3: ’Own-child’, 1: ’Husband’, 1:’Not-in- family’, 4: ’Unmarried’, 5: ’Wife’, and 2: ’Other-relative’.

# • race: Where 3: ’Own-child’, 1: ’Husband’, 1:’Not-in-family’, 4: ’Unmarried’, 5: ’Wife’, and 2: ’Otherrelative’.

# • Gender: Where 0: ’Female’ and 1: ’Male’.

# • capital-gain: Represents the capital gain of the individual.

# • capital-loss: Denotes the capital loss of the individual.

# • hours-per-week: Specifies the number of hours per week the individual works.

# • native-country: Represents the native country of the individual.

# • income: Specifies the income level Where 0: ’<=50K’ and 1 : ’>50K’

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Create a Spark session
spark = SparkSession.builder.appName("IncomePrediction").getOrCreate()

# Assuming 'incoming' is your DataFrame
# If not, use your DataFrame name instead
df = spark.createDataFrame(pd.read_csv('income_for_adult.csv'))

# Find the average weekly hours worked by individuals grouped by gender
average_hours = df.groupBy("gender").agg({"hours-per-week": "avg"})
average_hours.show()
# Remove rows with NaN values
df = df.dropna()

# Perform necessary featurization
indexers = [StringIndexer(inputCol=col, outputCol=col+"_index", handleInvalid='keep').fit(df) for col in ['workclass',
                                                                                                          'education',
                                                                                                          'marital-status',
                                                                                                          'occupation',
                                                                                                          'relationship',
                                                                                                          'race',
                                                                                                          'native-country']]
for indexer in indexers:
    df = indexer.transform(df)

assembler = VectorAssembler(inputCols=['age', 'workclass_index', 'fnlwgt', 'education_index',
                                       'educational-num', 'marital-status_index',
                                       'occupation_index', 'relationship_index',
                                       'race_index', 'gender', 'capital-gain',
                                       'capital-loss', 'hours-per-week',
                                       'native-country_index'], outputCol="features")
df = assembler.transform(df)

# Convert the label column to numeric
indexer = StringIndexer(inputCol="income", outputCol="label")
df = indexer.fit(df).transform(df)

# Split the data into training and test sets
(train_data, test_data) = df.randomSplit([0.8, 0.2], seed=42)

# Create a LogisticRegression model
lr = LogisticRegression(labelCol="label", featuresCol="features")

# Train the model
model = lr.fit(train_data)

# Make predictions
predictions = model.transform(test_data)

# Evaluate the model
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy:", accuracy)


+------+-------------------+
|gender|avg(hours-per-week)|
+------+-------------------+
|   0.0|    36.391899778236|
|   1.0|  42.42160043254934|
|   NaN|                NaN|
+------+-------------------+

Accuracy: 0.8399844014038736


----------------------------------

In [None]:
# Email spam classification involves classifying an email as spam or non-spam (binary classification).
# The idea is to use the content of the emails to determine whether enough signs are available to flag such
# email as a spam. Some of the most popular spam email classification algorithms include Random Forest Classifier.
# You are required to use the RandomForestClassifier in the SparkML classification library.




#  Few things to note about RandomForestClassifier:

#  It requires that the provided columns be named features and label specifically.
# All columns, including the label, must be numeric.


#  The dataset you will use has 30 columns in total:

#  [ ‘Email No.’, ‘the’, ‘to’, ‘for’, ‘a’, ‘you’, ‘hou’, ‘is’,
#  ‘this’, ‘i’, ‘your’, ‘we’, ’are’, ‘com’, ‘please’, ‘price’,
#  ‘attached’, ‘th’, ‘forward’, ‘u’, ‘click’, ‘unsubscribe’,
#  ‘pro’, ‘therefore’, ‘cc’, ‘prize’, ’hi’, ‘deadline’, ‘ur’, ‘Spam’ ]

#  Email No: string representing the number of email (index).
#  Spam: int (1 if spam, 0 if not).
#  The other 28 columns are integers having the number of occurrence of each word in the email.





#  You will need to do any necessary featurization before applying the RandomForestClassifier
#  on the data and calculating the accuracy.

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Create a Spark session
spark = SparkSession.builder.appName("EmailSpamClassification").getOrCreate()

# Load the dataset from CSV
df = spark.read.csv("emails.csv", header=True, inferSchema=True)

# Assemble features
feature_cols = df.columns[1:-1]  # Exclude 'Email No.' and 'Spam'
assembler = VectorAssembler(inputCols=feature_cols, outputCol='features')
df = assembler.transform(df)

# Split the dataset into training and test sets
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)

# Train the RandomForestClassifier
rf = RandomForestClassifier(labelCol='Spam', featuresCol='features')
model = rf.fit(train_data)

# Make predictions
predictions = model.transform(test_data)

# Evaluate the model
evaluator = BinaryClassificationEvaluator(labelCol='Spam')
accuracy = evaluator.evaluate(predictions)

# Print the accuracy
print(f"Accuracy: {accuracy}")

Accuracy: 0.9100533738507083
