In [1]:
import sys
from pyspark.sql import SparkSession,  DataFrame, functions as F
from pyspark.sql.types import DoubleType


In [2]:
s3_source_path = "s3://datalake/raw/kaggle_airbnb/reviews.csv"


In [3]:
spark = SparkSession.builder.appName("clean_reviews").getOrCreate()


23/12/27 00:25:33 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [4]:
df = spark.read\
        .option("sep",",")\
        .option("inferSchema", "true")\
        .option("header", "true")\
        .option("multiline","true")\
        .option("quote", '"')\
        .option("escape", "\\")\
        .option("escape", '"')\
        .option("encoding", "UTF-8")\
        .option("ignoreLeadingWhiteSpace", "true")\
        .option("ignoreTrailingWhiteSpace", "true")\
        .csv(s3_source_path)

23/12/27 00:25:38 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                

In [5]:
df.limit(5).toPandas().head()


                                                                                

Unnamed: 0,listing_id,id,date,reviewer_id,reviewer_name,comments
0,7202016,38917982,2015-07-19,28943674,Bianca,Cute and cozy place. Perfect location to every...
1,7202016,39087409,2015-07-20,32440555,Frank,Kelly has a great room in a very central locat...
2,7202016,39820030,2015-07-26,37722850,Ian,"Very spacious apartment, and in a great neighb..."
3,7202016,40813543,2015-08-02,33671805,George,Close to Seattle Center and all it has to offe...
4,7202016,41986501,2015-08-10,34959538,Ming,Kelly was a great host and very accommodating ...


In [12]:
df.filter(df.comments.isNull()).show()

+----------+--------+----------+-----------+-------------+--------+
|listing_id|      id|      date|reviewer_id|reviewer_name|comments|
+----------+--------+----------+-----------+-------------+--------+
|    461567|11614247|2014-04-09|   12120141|        Abbey|    NULL|
|      9460|10563024|2014-02-24|   12498029|        Debra|    NULL|
|    910784| 9950520|2014-01-21|     179481|       Enrico|    NULL|
|     10695|   52378|2010-06-13|     105412|          Wei|    NULL|
|   1018204|10024984|2014-01-26|   10571694|       Jordan|    NULL|
|   6079216|34824019|2015-06-12|   31556342|         Mack|    NULL|
|   3354614|18103248|2014-08-21|   12426758|         Jeff|    NULL|
|   3554558|24863045|2015-01-03|   24488791|      Eleanor|    NULL|
|   1790020|15640556|2014-07-13|   16884291|      Michael|    NULL|
|     23430| 8347394|2013-10-27|    5034901|          Jim|    NULL|
|    774659| 7116754|2013-09-07|    7654662|    Elizabeth|    NULL|
|    585418|10782872|2014-03-07|   11979005|    

In [9]:
df.dtypes

[('listing_id', 'int'),
 ('id', 'int'),
 ('date', 'date'),
 ('reviewer_id', 'int'),
 ('reviewer_name', 'string'),
 ('comments', 'string')]

In [None]:
def drop_unneeded_columns(df: DataFrame, words: list) -> DataFrame:
    columns_to_drop = [col for col in df.columns if any(word in col for word in words)]
    return df.drop(*columns_to_drop)

def transform_columns(df):
    # Convert boolean columns
    boolean_cols = [col_name for col_name in df.columns
                        if df.select(col_name).filter(df[col_name].isin(['t', 'f'])).count()
                        == df.select(col_name).filter(f'NOT {col_name} IS NULL').count()]
    
    for bc in boolean_cols:
        df = df.withColumn(bc, F.when(F.col(bc) == 't', True).otherwise(False))

    # Standardize price fields
    # Find columns with non-null values all starting in $ 
    # and emove non-numeric characters from price (e.g., '$', ',')
    price_cols = [col_name for col_name in df.columns
                  if df.filter((~F.col(col_name).rlike('^\\$')) & (~F.isnull(F.col(col_name)))).count() == 0]
    for pc in price_cols:
        df = df.withColumn(pc, F.regexp_replace(F.col(pc), "[$,]", "").cast(DoubleType()))  

    return df

In [None]:
df_trimmed = df.transform(drop_unneeded_columns, column_keywords_to_exclude)

In [None]:
df_clean = df_trimmed.transform(transform_columns)

In [None]:
df_clean.cache()


In [None]:
df_clean.dtypes

In [None]:
df_clean.limit(5).toPandas().head()


In [None]:
df_clean.limit(20).write.option('header', True).option("multiline","true").option("escape", '"').mode('overwrite').csv(f'airbnb/clean_listings_10')


In [None]:
spark.stop()