# Storage Solutions for Big Data - CA1


The assessment CA 1 by **Yulianna Tsaruk**.\\\n",
Programme Title: Higher Diploma in Science in AI Applications
Module Title: Storage Solutions for Big Data




## Code contents:
- **Exploratory Data Analysis & Processing (this file)**
- **[Training model and Usage Example](./2_training.ipynb)**



## Intoduction

Fir this project I'm using HDFS (Hadoop Distributed File System) as a primary storage system used by Apache Spark for processing, and an interface for Apache Spark in Python called PySpark.

In this file, I will load the dataset, process it and save as in Apache Parquet – a column-oriented data storage format in the Apache Hadoop ecosystem, designed for efficient data storage and retrieval.

In [1]:
# !pip install holidays

In [2]:
# import spark instances
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import to_date, dayofmonth, month, year, col, explode, \
                unix_timestamp, when, regexp_replace, mean, concat_ws, \
                dayofweek, udf
from pyspark.sql.types import FloatType, BooleanType, StringType

# import additional libraries
import pandas as pd
import matplotlib.pyplot as plt
import holidays

In [3]:
# Creating Spark session with configurations
spark = (SparkSession.builder \
    .appName("Tokyo Airbnb Analysis")
    # hardware-related configs, comment it if not needed for your machine.
    .config("spark.driver.memory", "6g")
    .config("spark.executor.memory", "6g")  
    .config("spark.dynamicAllocation.enabled", "true")
    .config("spark.network.timeout", "600s") 
    .config("spark.executor.heartbeatInterval", "120s")
    
    # to output more
    .config("spark.sql.debug.maxToStringFields", 100)
    .getOrCreate())


24/04/24 18:50:19 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [4]:
# simulating same output equivalent to the pandas.DataFrame.info() method  
def print_dataframe_info(df: DataFrame):
    """
    Print basic information about data like column names, null counts, and data types for a Spark DataFrame.

    Args:
    df (DataFrame): The Spark DataFrame to be analyze.
    """
    # DataFrame shape
    total_rows = df.count()
    total_cols = len(df.columns)
    

    # Collect column names and their data types
    schema_info = [(field.name, field.dataType) for field in df.schema.fields]
    out_ = []
    for column, dtype in schema_info:
        null_count = df.filter(col(column).isNull()).count()
        out_.append({'Column': column, 'Nulls': null_count, 'Type': dtype.simpleString()})
    
    print(pd.DataFrame(out_))
    print()
    print(f"\tA dataset shape: {total_rows} rows, {total_cols} columns.")

pd.set_option('display.max_rows', None) # show all rows

## Load 1st dataset

In [5]:
# location of 1st file in Hadoop
dataset_path = "/user1/dataset/calendar.csv" 

# load data
df_calendar = spark.read.csv(dataset_path, header=True, # 1st line is a header
                             inferSchema=True           # detect data types automatically
                            )
df_calendar.show(5)

                                                                                

+----------+----------+---------+----------+--------------+--------------+--------------+
|listing_id|      date|available|     price|adjusted_price|minimum_nights|maximum_nights|
+----------+----------+---------+----------+--------------+--------------+--------------+
|    197677|2023-06-29|        f|$11,000.00|    $11,000.00|             3|          1125|
|    197677|2023-06-30|        f|$11,000.00|    $11,000.00|             3|          1125|
|    197677|2023-07-01|        f|$11,000.00|    $11,000.00|             3|          1125|
|    197677|2023-07-02|        f|$11,000.00|    $11,000.00|             3|          1125|
|    197677|2023-07-03|        f|$11,000.00|    $11,000.00|             3|          1125|
+----------+----------+---------+----------+--------------+--------------+--------------+
only showing top 5 rows



### Explore and Process the data

In [6]:
# Nulls and types summary
print_dataframe_info(df_calendar)



           Column  Nulls    Type
0      listing_id      0  bigint
1            date      0    date
2       available      0  string
3           price      0  string
4  adjusted_price      0  string
5  minimum_nights    332     int
6  maximum_nights    332     int

	A dataset shape: 4078413 rows, 7 columns.


                                                                                

In [7]:
# Statistical summary
df_calendar.describe().show()



+-------+--------------------+---------+-------------+--------------+------------------+------------------+
|summary|          listing_id|available|        price|adjusted_price|    minimum_nights|    maximum_nights|
+-------+--------------------+---------+-------------+--------------+------------------+------------------+
|  count|             4078413|  4078413|      4078413|       4078413|           4078081|           4078081|
|   mean|2.520073870723633...|     null|         null|          null|2.7664423536462373| 676.3645109059873|
| stddev|3.745944660493916...|     null|         null|          null|12.338796688561727|449.44182861714916|
|    min|              197677|        f|$1,000,000.00| $1,000,000.00|                 1|                 1|
|    max|  923132709196905769|        t|  $999,999.00|   $999,999.00|              1000|              1125|
+-------+--------------------+---------+-------------+--------------+------------------+------------------+



                                                                                

Some variables are wrong dtype. For example, we can't see mean of price column, because values are string type. 

In [8]:
# Check amount of unique values in the 'listing_id' column
listing_gr = df_calendar.groupBy("listing_id").count()

print('There are', listing_gr.count(), 'properties in this dataset.')

pandas_df_listing = listing_gr.orderBy(col("count")).toPandas()
pandas_df_listing['days_count'] = pandas_df_listing['count']
result = pandas_df_listing.groupby('days_count').size().reset_index(name='properties_count')
pandas_df_listing.head()

                                                                                

There are 11175 properties in this dataset.


                                                                                

Unnamed: 0,listing_id,count,days_count
0,4050925,33,33
1,12597472,339,339
2,19413667,339,339
3,742241466430561437,339,339
4,806171362910072416,339,339


In [9]:
pandas_df_listing.head(6)

Unnamed: 0,listing_id,count,days_count
0,4050925,33,33
1,12597472,339,339
2,19413667,339,339
3,742241466430561437,339,339
4,806171362910072416,339,339
5,772140258073141388,339,339


In [10]:
result

Unnamed: 0,days_count,properties_count
0,33,1
1,339,5
2,365,11169


1 property has data only for 33 days, while most of other properties obtain data for a whole year (365 days). I will drop this property, and for other 5 that has data only for 339 days, I will impute it.

In [11]:
listing_to_correct = list(pandas_df_listing[pandas_df_listing['count'] == 339]['listing_id'])
listing_to_correct

[12597472,
 19413667,
 742241466430561437,
 806171362910072416,
 772140258073141388]

In [12]:
# drop rows for 'listing_id' == 4050925 because too much data is missing
df_calendar = df_calendar.filter(col("listing_id") != 4050925)
# For other listings, that has data only for 339 days out of 365, I will impute missing values.

In [13]:
from pyspark.sql.functions import col, lit, coalesce, first, last, monotonically_increasing_id, lag, lead


In [14]:
filtered_data = df_calendar.filter(col("listing_id").isin(listing_to_correct))
filtered_data

DataFrame[listing_id: bigint, date: date, available: string, price: string, adjusted_price: string, minimum_nights: int, maximum_nights: int]

In [15]:
# Create a monotonically increasing ID for row ordering
imputed_data_interpolate = filtered_data.withColumn("row_id", monotonically_increasing_id())


In [19]:
imputed_data_interpolate.show(3)

+----------+----------+---------+----------+--------------+--------------+--------------+------+
|listing_id|      date|available|     price|adjusted_price|minimum_nights|maximum_nights|row_id|
+----------+----------+---------+----------+--------------+--------------+--------------+------+
|  12597472|2023-06-29|        f|$60,000.00|    $54,000.00|             2|          1125|     0|
|  12597472|2023-06-30|        f|$60,000.00|    $54,000.00|             2|          1125|     1|
|  12597472|2023-07-01|        f|$70,000.00|    $63,000.00|             2|          1125|     2|
+----------+----------+---------+----------+--------------+--------------+--------------+------+
only showing top 3 rows



In [16]:
to_correct_df

NameError: name 'to_correct_df' is not defined

In [None]:
# df_calendar = df_calendar.withColumn("date", to_date(df_calendar.date, 'yyyy-MM-dd'))

In [None]:
# Options in col 'available'
df_calendar.select('available').distinct().show()

It's wotrth to note that, though price has a US dollar sign, it is in Japanese Yen and a sign must be removed in order to convert data to float.

In [None]:
# check min/max nights values
#df_calendar.filter(col('minimum_nights')>=90).groupBy("minimum_nights").count().sort(col('count').desc())
nights_df = df_calendar.select(col('minimum_nights'), col('maximum_nights')).toPandas()
nights_df.describe()

In [None]:
nights_df.head()

In [None]:
nights_df.plot(kind='density')

In [None]:
# Data Preprocessing
df_calendar_new = df_calendar \
    .withColumn("available", when(col("available") == "t", 1).otherwise(0)) \
    .withColumn("price", regexp_replace(col("price"), "[\$,]", "").cast(FloatType())) \
    .withColumn("adjusted_price", regexp_replace(col("adjusted_price"), "[\$,]", "").cast(FloatType())) \
    .withColumn("date_unix", unix_timestamp(col("date")))
    #.withColumn("day", dayofmonth(col("date"))) \
    #.withColumn("month", month(col("date"))) \
    #.withColumn("year", year(col("date")))


    # .orderBy(["date", "listing_id"])

    #.withColumn("days_since", datediff(col("date"), lit("2023-07-29"))) \ # new col with day since last update
    #.withColumn("minimum_nights", col("minimum_nights").cast(IntegerType())) \
    #.withColumn("maximum_nights", col("maximum_nights").cast(IntegerType()))

In [None]:
# Find out if there's a difference in cols "price" and "adjusted_price"
df_with_diff = df_calendar_new.withColumn("price_difference", col("price") - col("adjusted_price"))

# Filter rows where price_difference is not zero
rows_with_difference = df_with_diff.filter(col("price_difference") != 0)

# count how many rows with differences
rows_with_difference.count()

In [None]:
rows_with_difference.filter(col('adjusted_price')>col('price')).count()

In [None]:
rows_with_difference.filter(col('adjusted_price')<col('price')).count()

In [None]:
df_calendar_new.filter(col('adjusted_price')==col('price')).count()

I save 'adjusted_price' col name to drop it later.

In [None]:
# alongside with 'min/maximum_nights' which doesn't look correct
col_to_drop = ['price', 'minimum_nights', 'maximum_nights']

In [None]:
df_calendar_new.show(3)

In [None]:
df_calendar_new.select(col('price')).describe().toPandas()

In [None]:
all_price = df_calendar_new.select(col('price')).toPandas()

In [None]:
all_price.groupby('price').size().reset_index(name='price_count') \
.sort_values( 'price', ascending=False).head()#.plot()


In [None]:
df_corrected.filter(
    (col('listing_id') == 43547243) &
    (col('listing_id') == 8108276) &
    (col('listing_id') == 43547291)).show(5)

## Outliers

In [None]:
from pyspark.sql.functions import col, lit, min
from pyspark.sql.window import Window

quartiles = df_calendar_new.approxQuantile("price", [0.25, 0.75], 0.05)
Q1, Q3 = quartiles[0], quartiles[1]
IQR = Q3 - Q1

# Define bounds for the outliers
lower_bound = df_calendar_new.select(min(col('price')).alias('lowest_price')).first()['lowest_price']

upper_bound = Q3 + 1.5 * IQR

In [None]:
upper_bound

In [None]:
df_calendar_new.filter((col('price') > upper_bound)).show(5)

In [None]:
df_calendar.filter(col('listing_id') == 1732795).filter(col('date')=='2023-12-29').show(3)

In [None]:
from pyspark.sql.functions import when
df_corrected = df_calendar_new.withColumn("price", 
                             when(col("price") < lower_bound, lower_bound)
                             .when(col("price") > upper_bound, None)
                             .otherwise(col("price")))

In [None]:
df_corrected.where(col("price").isNull()).show()

## Load 2nd dataset

In [None]:
df_list = spark.read.csv("/user1/dataset/listings.csv",
    header=True, # 1st line is a header
    quote='"',  
    escape='"', 
    multiLine=True,  # Handles new lines in fields
    inferSchema=True,  # detect data types automatically
    ignoreLeadingWhiteSpace=True,  # Ignoring white space in a line
    ignoreTrailingWhiteSpace=True)

In [None]:
df_list.show(2)

In [None]:
# The output above is messy, let's print it pandas' df
df_list.limit(5).toPandas()

In [None]:
# check if everithing loaded correctly through schema
df_list.printSchema()

From this dataset I'll take some info to complete my 1st one. Potentially useful columns are:
* neighbourhood_cleansed
* host_identity_verified
* location (latitude/longitude)
* property_type
* instant_bookable

In [None]:
df_list.select('property_type').distinct().toPandas()

In [None]:
df_list.select('room_type').distinct().toPandas()

After checking unique values, I see that the feature I want is called 'room_type', while 'property_type' consist of marketing names.

In [None]:
print_dataframe_info(df_list)

In [None]:
# check unique values in the 'id' column
unique_ids_list = df_list.select("id").distinct()

'Unique IDs:', unique_ids_list.count(), unique_ids_list.count()-unique_ids.count()

In [None]:
selected_cols = [
    'id',
    'neighbourhood_cleansed',
    'room_type',
    'host_identity_verified',
    'instant_bookable',
]
new_df = df_list.select(selected_cols)

In [None]:
# Merge new df with selected_cols and df_calendar on col id and listing_id
merged_df = new_df.join(df_calendar_new, new_df.id == df_calendar_new.listing_id, "inner")


In [None]:
merged_df.take(1)

In [None]:
merged_df = merged_df.drop('listing_id')

In [None]:
merged_df.take(1)

In [None]:
#fixing dtypes

df = merged_df \
    .withColumn("host_identity_verified", when(col("host_identity_verified") == "t", True).otherwise(False).cast(BooleanType())) \
    .withColumn("instant_bookable", when(col("instant_bookable") == "t", True).otherwise(False).cast(BooleanType()))
                

In [None]:
df.dtypes

## Analysis

In [None]:
df.where(col("available")).show(2)

In [None]:
# df.limit(5).toPandas()

In [None]:
df_busy_times = df.where(col("available") == False) \
                  .groupBy(year("date").alias("year"), month("date").alias("month")) \
                  .count() \
                  .orderBy("year", "month")


In [None]:
df_busy_times.show()

In [None]:
pandas_df = df_busy_times.toPandas()
pandas_df.sort_values(['year', 'month', 'year'], ascending=True, inplace=True)

In [None]:
pandas_df

In [None]:
pandas_df.plot(x='month', y='count', kind='bar')

In [None]:
#'month+year' column, calculate the mean of 'price',
# and order the results by 'year-month

df_price = df.where(col("price") > 0) \
                  .withColumn("year_month", concat_ws("-", year("date"), month("date"))) \
                  .groupBy("year_month") \
                  .agg(mean("price").alias("mean")) \
                  .orderBy('year_month').toPandas()

# df_price.sort_values(['year', 'month', 'year'], ascending=True, inplace=True)

In [None]:
df_price

In [None]:
df.select('id', 'price').distinct().sort(col("price").desc()).show(5)

In [None]:
df.where(col('id')== 561785108651423732).select('date', 'price', 'available').sort(col("price").desc(), 'date').show()

In [None]:
df_price['year_month'] = df_price['year_month'].astype('period[M]')
df_price.sort_values('year_month', ascending=True, inplace=True)

In [None]:
df_price.plot(x='year_month', y='mean', kind='bar'), df_price.plot(kind='line', x='year_month', y='mean')

In [None]:
df_price

In [None]:
df_list.unpersist()

In [None]:
df_calendar.unpersist()

## Feature Selection and Engineering

In [None]:
df.columns

I'm adding new features regarding date, so ML algorithm will be able to find dependencies easier:
- if date is a weekend
- if date is a holiday

I want to use a pipeline, I need this process to be integrated into the pipeline as transformer.

In [None]:
# Create a Custom Transformer with Weekend & Holiday Check

In [None]:
# if date is a weekend
df_date = df.withColumn("weekends", dayofweek(col("date")).isin([6, 7]))

In [None]:
# for holiday detection I use holidays module and user-defined function
jp_holidays = holidays.Japan()

def is_holiday(date):
    return date in jp_holidays

holiday_udf = udf(is_holiday, BooleanType())

df_date = df_date.withColumn("holiday", holiday_udf(col("date")))

In [None]:
df_date = df_date.sort('date')
# to use it as categorical feature
df_date = df_date.withColumn("date", col('date').cast(StringType()))
df_date.dtypes

In [None]:
#col_to_drop = ['price', 'id', 'maximum_nights','listing_id']

In [None]:
print_dataframe_info(df_date)

In [None]:
# Cleaning up 
# deleting cols that won;t be used for training
col_to_drop += ['id', 'maximum_nights', 'date']
#col_to_drop = list(set(col_to_drop))
# print(df.columns, col_to_drop)
df_model = df_date.drop(*col_to_drop)

In [None]:
df_model = df_model.withColumnRenamed('adjusted_price', 'price')
df_model

In [None]:
# Save DataFrame to HDFS in Parquet format
df_model.write.parquet("/user1/dataset/db")

In [None]:
df_model.unpersist()
df_model= spark.read.parquet("/user1/dataset/db/")

## Train a model

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import  when, col, unix_timestamp, regexp_replace # , dayofmonth, month, year
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [None]:
df_model.dtypes

In [None]:

# To encode categorical features
indexer = StringIndexer(inputCols=["room_type", "neighbourhood_cleansed"], #, 'date'],
                        outputCols=["type_indexed", 'neighbourhood_indexed']#, 'date_indexed']
                       )

# Assemble continuous features individually
price_vector = VectorAssembler(inputCols=["price"], outputCol="price_vec")
date_vector = VectorAssembler(inputCols=["date_unix"], outputCol="date_vec")

# Scale continuous features separately
price_scaler = StandardScaler(inputCol="price_vec", outputCol="scaled_price", withStd=True, withMean=True)

date_scaler = StandardScaler(inputCol="date_vec", outputCol="scaled_date", withStd=True, withMean=True)


# Combine numerical and categorical features into a single vector
assembler_f = VectorAssembler(
    inputCols=['scaled_price', 'scaled_date',
               # BOOLEAN
               'instant_bookable', 'host_identity_verified', 'weekends', 'holiday',
              # CATEGORICAL
               "type_indexed", 'neighbourhood_indexed'],
    outputCol="all_features"
)


# Split the data into training and testing sets
train_data, test_data = df_model.randomSplit([0.7, 0.3], seed=42)
# train_data

In [None]:
train_data.show(1)

In [None]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

param_grid = (ParamGridBuilder()
                 .addGrid(RandomForestClassifier.maxDepth, [3, 5, 8])
                 .addGrid(RandomForestClassifier.numTrees, [100, 200, 300])
                 .build())

# Create a classifier for pipeline
classifier = RandomForestClassifier(labelCol="available",
                                    featuresCol="all_features",
                                    maxBins=375,
                                    seed=42
                                   )


In [None]:
# Define pipeline
pipeline = Pipeline(stages=[indexer, price_vector, date_vector, price_scaler, date_scaler, 
                            assembler_f, classifier])

#pipeline.fit(train_data).transform(train_data).show(2)

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator

# Define the evaluator
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")  # Replace with your preferred metric

# Define and run cross-validation
cv = CrossValidator(estimator=pipeline,
                    estimatorParamMaps=param_grid,
                    evaluator=evaluator, numFolds=5)

# model = cv.fit(train_data)

In [None]:

# Train the model
model = pipeline.fit(train_data)

In [None]:
# Make predictions
predictions = model.transform(test_data)

In [None]:
# Evaluate the model
evaluator = BinaryClassificationEvaluator(labelCol="available")
accuracy = evaluator.evaluate(predictions)
auc = evaluator.evaluate(predictions, {evaluator.metricName: 'areaUnderROC'})

print(f"Accuracy: {accuracy}, AUC: {auc}")

In [None]:
#classifier.params

In [None]:
feature_names = assembler_f.getInputCols()
#.featureImportances


feature_importance_dict = dict(zip(feature_names, model.stages[-1].featureImportances.toArray()))


In [None]:
imp = pd.DataFrame(sorted(feature_importance_dict.items(), key=lambda x: x[1], reverse=True),
                  columns=['feature', 'importance'])
imp.sort_values('importance', inplace=True)

In [None]:
imp.plot.barh(x='feature', grid=True)

In [None]:
df_model.filter(col('holiday')).show(3)

In [None]:
rdds = spark.sparkContext.getRDDStorageInfo()
for rdd_info in rdds:
    print(f"RDD Id: {rdd_info.rddId}, Name: {rdd_info.name}, StorageLevel: {rdd_info.storageLevel}, Cached Partitions: {rdd_info.numCachedPartitions}, Memory Size: {rdd_info.memSize}, Disk Size: {rdd_info.diskSize}")

In [None]:
fdfdf

## Real World Usage



In [None]:

#'date': [date_picker.value],
#'property_type': [property_type_dropdown.value],
#'neighbourhood_cleansed': [neighborhood_dropdown.value],
#'instant_bookable': [instant_bookable_toggle.value == 'Yes'],
#'host_identity_verified': [verified_host_toggle.value == 'Yes']

In [None]:
fffff

# REG

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, to_date, datediff, lit,regexp_replace

from pyspark.sql.types import IntegerType, FloatType
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline

# Let's assume the data is in a CSV file named 'dataset.csv'
df = spark.read.csv("/user1/dataset/calendar.csv", header=True, inferSchema=True)

# Data Preprocessing
df = df.withColumn("date", to_date(col("date"), "yyyy-MM-dd")) \
       .withColumn("days_since", datediff(col("date"), lit("2023-01-01"))) \
       .withColumn("available", when(col("available") == "t", 1).otherwise(0)) \
       .withColumn("price", regexp_replace(col("price"), "[\$,]", "").cast(FloatType())) \
       .withColumn("adjusted_price", regexp_replace(col("adjusted_price"), "[\$,]", "").cast(FloatType())) \
       .withColumn("minimum_nights", col("minimum_nights").cast(IntegerType())) \
       .withColumn("maximum_nights", col("maximum_nights").cast(IntegerType()))

In [None]:
df.show(5)

In [None]:
# Feature Engineering
assembler = VectorAssembler(inputCols=["days_since", "available", "minimum_nights", "maximum_nights"],
                            outputCol="features")

# Modeling
lr = LinearRegression(featuresCol="features", labelCol="price")

# Pipeline
pipeline = Pipeline(stages=[assembler, lr])


In [None]:
# Train/Test Split
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)
train_df.show(2)

In [None]:
# Training
model = pipeline.fit(train_df)

# Prediction
predictions = model.transform(test_df)

# Evaluation
evaluator = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)


In [None]:
model.