In [1]:
# Configure the following to use YOUR GCP setup

# 1. Configure the Project ID (not Project Name!!!) as per your GCP Dataproc setup
project_id = 'sazhakat-cis415-su24a'

# 2. Configure Bucket name as per your Google Cloud Storage setup
bucket = 'sazhakat_data_for_gcp_labs/data_for_GCP_labs'

# 3. Configure the path to the movie reviews data file as per your Google Cloud Storage setup
#    If your setup is exactly as per the instructions in GCP Lab 1c and in this lab:
#       --- you will not need to make any changes to the below line.
#    If your setup is different (due to whatever reason - doesn't matter),
#       --- just update the below line to reflect the path as per YOUR Google Cloud Storage setup
path_to_data_files = "/data_for_text_mining_lab/"

# 3. Configure the appropriate data file to be used for the task
#       Uncomment one of the two lines below based on the following:
#          In Google Colab, you should build/test with SMALL DATA
#          In GCP, first you should run with SMALL DATA
#             and finally, you should run with BIG DATA

# movie_reviews_file_name = "big_data_movie_reviews.txt"
movie_reviews_file_name = "big_data_movie_reviews.txt"

# Lastly, we will build the full path of the data file and confirm it's correct
# You do not need to change this line
full_file_path = "gs://" + bucket + path_to_data_files + movie_reviews_file_name

# Let's print out all the configurations and ensure that they are correct
print(f"ProjectID (and not the Project Name) is: {project_id}")
print(f"Bucket name is: {bucket}")
if movie_reviews_file_name == "small_data_movie_reviews.txt":
  print(f"We will run this task for SMALL DATA ({movie_reviews_file_name})")
elif movie_reviews_file_name == "big_data_movie_reviews.txt":
  print(f"We will run this task for BIG DATA ({movie_reviews_file_name})")
else:
  print("-"*20)
  print(f"Incorrect data file name - {movie_reviews_file_name}!! CHECK & FIX!!!")
  print("-"*20)

print(f"Full path to the data file is {full_file_path}")

# You should not need to make any other change in the code below, unless your setup is different from the lab instructions

ProjectID (and not the Project Name) is: sazhakat-cis415-su24a
Bucket name is: sazhakat_data_for_gcp_labs/data_for_GCP_labs
We will run this task for BIG DATA (big_data_movie_reviews.txt)
Full path to the data file is gs://sazhakat_data_for_gcp_labs/data_for_GCP_labs/data_for_text_mining_lab/big_data_movie_reviews.txt


In [3]:
# Typically, any big data platform (like GCP Dataproc) will have PySpark pre-installed
# In all other platforms (e.g. your laptop, Google Colab etc.), PySpark will be not pre-installed.
# This paragraph is to check if PySpark is available in the system and install if it's not available
# You should expect this paragraph to RUN the PySpark installation in Google Colab
# You should expect this paragraph NOT TO RUN the PySpark installation in GCP Dataproc

try:
  from pyspark.sql import SparkSession
  pyspark_available = 'Y'
except:
  pyspark_available = 'N'

# If PySpark is not installed, then go through all these steps

if pyspark_available == 'N':
  # Update Installer
  !apt-get update

  # Intsall Java
  !apt-get install openjdk-8-jdk-headless -qq > /dev/null

  # install spark (change the version number if needed)
  !wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz

  # unzip the spark file to the current folder
  !tar xf spark-3.0.0-bin-hadoop3.2.tgz

  # set your spark folder to your system path environment.
  import os
  os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
  os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"

  # install findspark using pip
  !pip install -q findspark

  import findspark
  findspark.init()

  from pyspark.sql import SparkSession
  spark = SparkSession.builder.master("local[*]").getOrCreate()

  # To access Google Cloud Storage
  from google.cloud import storage
  import google.auth

  !pip install gcsfs
  import gcsfs

  from google.colab import auth
  auth.authenticate_user()

  credentials, default_project_id = google.auth.default()
  !gcloud config set project {project_id}
else:
    # Spark / PySpark already pre-installed in the environment
    print("PySpark already pre-installed!")


PySpark already pre-installed!


In [4]:
from pyspark.ml.feature import CountVectorizer, IDF, Tokenizer, HashingTF
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql.functions import lit

# Normally, we will not need Pandas if we are working with PySpark (since PySpark provides the dataframe capabilities)
# However, we will need pandas just for one step in this task: for reading files from Google Cloud Storage
#     This is because it's not straightforward to set the configurations correctly for letting spark read from GCS
from pandas import DataFrame, read_csv
# Added this line in SP24 since pandas removed iteritems from DataFrame object in 2024
DataFrame.iteritems = DataFrame.items

from google.cloud import storage

client = storage.Client()

print(f"Package imports done")

Package imports done


In [5]:
# Import necessary libraries
import pandas as pd
import numpy as np

# Set random seed for reproducibility
np.random.seed(42)

# Define the size of the datasets
big_dataset_size = 1000000
small_dataset_size = int(big_dataset_size * 0.05)

# Generate synthetic data
def generate_synthetic_data(size):
    data = {
        'Campaign_Reach': np.random.randint(1000, 100000, size),
        'Click_Through_Rate': np.random.uniform(0.01, 0.3, size),
        'Conversion_Rate': np.random.uniform(0.01, 0.1, size),
        'Customer_Engagement': np.random.uniform(1, 10, size),
        'Campaign_Cost': np.random.uniform(1000, 50000, size),
        'Campaign_Success': np.random.choice([0, 1], size),
        'Campaign_Type': np.random.choice(['Email', 'Social Media', 'Search', 'Display'], size),
        'Region': np.random.choice(['North America', 'Europe', 'Asia', 'South America'], size),
        'Season': np.random.choice(['Spring', 'Summer', 'Fall', 'Winter'], size)
    }

    df = pd.DataFrame(data)

    # Convert specific columns to float to allow NaN values
    df['Campaign_Reach'] = df['Campaign_Reach'].astype(float)
    df['Campaign_Success'] = df['Campaign_Success'].astype(float)

    # Introduce data issues
    # Add null values
    for col in df.columns:
        df.loc[np.random.randint(0, size, size // 100), col] = np.nan

    # Add outliers
    df.loc[np.random.randint(0, size, size // 100), 'Campaign_Cost'] *= 10

    return df

# Generate big and small datasets
big_dataset = generate_synthetic_data(big_dataset_size)
small_dataset = big_dataset.sample(n=small_dataset_size)

# Save datasets as CSV files
big_dataset.to_csv('big_dataset.csv', index=False)
small_dataset.to_csv('small_dataset.csv', index=False)



In [6]:
# Import necessary libraries
import pandas as pd

# Load the small dataset
small_dataset = pd.read_csv('small_dataset.csv')

# Generate descriptive statistics
descriptive_statistics = small_dataset.describe(include='all')

# Display the descriptive statistics
print(descriptive_statistics)


        Campaign_Reach  Click_Through_Rate  Conversion_Rate  \
count     49520.000000        49494.000000     49493.000000   
unique             NaN                 NaN              NaN   
top                NaN                 NaN              NaN   
freq               NaN                 NaN              NaN   
mean      50476.091175            0.154593         0.055009   
std       28594.412579            0.083627         0.025979   
min        1000.000000            0.010001         0.010001   
25%       25666.000000            0.082332         0.032747   
50%       50535.500000            0.154439         0.054666   
75%       75309.250000            0.226745         0.077495   
max       99999.000000            0.299996         0.100000   

        Customer_Engagement  Campaign_Cost  Campaign_Success Campaign_Type  \
count          49498.000000   49511.000000      49502.000000         49536   
unique                  NaN            NaN               NaN             4   
top      

In [7]:
# Import necessary libraries
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import f1_score

# Load the small dataset
small_dataset = pd.read_csv('small_dataset.csv')

# Drop rows with any null values
small_dataset = small_dataset.dropna()

# Select feature columns and target column
feature_cols = ['Campaign_Reach', 'Click_Through_Rate', 'Conversion_Rate', 'Customer_Engagement', 'Campaign_Cost']
X = small_dataset[feature_cols]
y = small_dataset['Campaign_Success']

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

# Train Logistic Regression model
lr_model = LogisticRegression()
lr_model.fit(X_train, y_train)

# Predict using the Logistic Regression model
lr_predictions = lr_model.predict(X_test)

# Calculate F1 Score for Logistic Regression model
lr_f1_score = f1_score(y_test, lr_predictions)

# Train Decision Tree model
dt_model = DecisionTreeClassifier()
dt_model.fit(X_train, y_train)

# Predict using the Decision Tree model
dt_predictions = dt_model.predict(X_test)

# Calculate F1 Score for Decision Tree model
dt_f1_score = f1_score(y_test, dt_predictions)

# Print the results
print("Logistic Regression -> F1 Score:", lr_f1_score)
print("Decision Tree -> F1 Score:", dt_f1_score)


Logistic Regression -> F1 Score: 0.5403714033058976
Decision Tree -> F1 Score: 0.49847272727272723


In [8]:
# Import necessary libraries
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import f1_score

# Load the big dataset
big_dataset = pd.read_csv('big_dataset.csv')

# Drop rows with any null values
big_dataset = big_dataset.dropna()

# Select feature columns and target column
feature_cols = ['Campaign_Reach', 'Click_Through_Rate', 'Conversion_Rate', 'Customer_Engagement', 'Campaign_Cost']
X = big_dataset[feature_cols]
y = big_dataset['Campaign_Success']

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

# Train Logistic Regression model
lr_model = LogisticRegression(max_iter=1000)  # Increase max_iter if needed
lr_model.fit(X_train, y_train)

# Predict using the Logistic Regression model
lr_predictions = lr_model.predict(X_test)

# Calculate F1 Score for Logistic Regression model
lr_f1_score = f1_score(y_test, lr_predictions)

# Train Decision Tree model
dt_model = DecisionTreeClassifier()
dt_model.fit(X_train, y_train)

# Predict using the Decision Tree model
dt_predictions = dt_model.predict(X_test)

# Calculate F1 Score for Decision Tree model
dt_f1_score = f1_score(y_test, dt_predictions)

# Print the results
print("Logistic Regression -> F1 Score:", lr_f1_score)
print("Decision Tree -> F1 Score:", dt_f1_score)


Logistic Regression -> F1 Score: 0.6288095263964188
Decision Tree -> F1 Score: 0.5010352419695844


In [9]:

# Load the data
def load_data():
    print(f"Reading the data file: {full_file_path}")
    try:
        # Assuming tab-separated file, adjust if needed
        data = pd.read_csv(full_file_path, sep="\t")
        print("Data loaded successfully. First few rows:")
        print(data.head())
        return data
    except Exception as e:
        print(f"An error occurred: {e}")
        # Return an empty DataFrame in case of error to avoid NoneType
        return pd.DataFrame()

# Perform EDA
def perform_eda(df):
    print("Summary statistics:")
    print(df.describe())
    print("Checking for null values:")
    print(df.isnull().sum())

# Feature selection
def feature_selection(df):
    # Adjust feature columns if needed based on your data
    feature_cols = ['Campaign_Reach', 'Click_Through_Rate',
                    'Conversion_Rate', 'Customer_Engagement', 'Campaign_Cost']
    # Assuming 'Campaign_Success' is your target variable, adjust if needed
    return df[feature_cols], df['Campaign_Success']

# Feature extraction (if any)
def feature_extraction(df):
    # Placeholder for any feature extraction if needed
    return df

# Model definition
def model_definition():
    lr = LogisticRegression()
    dt = DecisionTreeClassifier()
    return lr, dt

# Model training
def model_training(X_train, y_train, models):
    trained_models = []
    for model in models:
        model.fit(X_train, y_train)
        trained_models.append(model)
    return trained_models

# Model evaluation
def model_evaluation(models, X_test, y_test):
    f1_scores = []
    for model in models:
        predictions = model.predict(X_test)
        f1 = f1_score(y_test, predictions)
        f1_scores.append(f1)
    return f1_scores

# Print evaluation metrics
def print_evaluation_metrics(f1_scores):
    model_names = ["Logistic Regression", "Decision Tree"]
    for name, score in zip(model_names, f1_scores):
        print(f"{name} -> F1 Score:", score)

# Main function to run the pipeline
def main():
    data = load_data()
    if data.empty:  # Check if DataFrame is empty due to errors
        print("Pipeline stopped due to error in loading data.")
        return

    perform_eda(data)
    X, y = feature_selection(data)
    X = feature_extraction(X)
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
    models = model_definition()
    trained_models = model_training(X_train, y_train, models)
    f1_scores = model_evaluation(trained_models, X_test, y_test)
