# Xử lý trước khi đưa vào Asset

# Import Library

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.functions import udf, to_timestamp

In [6]:
# Create SparkSession object
spark = SparkSession.builder \
                    .appName("HomeWork-W6") \
                    .getOrCreate()

In [8]:
spark_df = spark.read.parquet("/content/drive/MyDrive/Colab Notebooks/202204.pq")

In [9]:
spark_df.show(5)

+-----------+--------------------+--------------------+--------------------+---------------+----------+--------------------+--------------------+----------+-----+--------+-------------+--------------------+-----------------+----------------+
|   video_id|               title|         publishedAt|           channelId|   channelTitle|categoryId|       trending_date|                tags|view_count|likes|dislikes|comment_count|      thumbnail_link|comments_disabled|ratings_disabled|
+-----------+--------------------+--------------------+--------------------+---------------+----------+--------------------+--------------------+----------+-----+--------+-------------+--------------------+-----------------+----------------+
|zoHGxJKjC_Y|Heiratsantrag, di...|2022-04-01T16:36:48Z|UCm3_j4RLEzgMovQT...|   Drachen Lord|        24|2022-04-02T00:00:00Z|drachenlord origi...|    126194| 4922|       0|         2069|https://i.ytimg.c...|            False|           False|
|s38-OigKoIU|Nachgefragt: Panz..

In [10]:
spark_df.printSchema()

root
 |-- video_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- publishedAt: string (nullable = true)
 |-- channelId: string (nullable = true)
 |-- channelTitle: string (nullable = true)
 |-- categoryId: string (nullable = true)
 |-- trending_date: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- view_count: string (nullable = true)
 |-- likes: string (nullable = true)
 |-- dislikes: string (nullable = true)
 |-- comment_count: string (nullable = true)
 |-- thumbnail_link: string (nullable = true)
 |-- comments_disabled: string (nullable = true)
 |-- ratings_disabled: string (nullable = true)



In [7]:
def replace_str(value: str):
    return value.replace("default", "maxresdefault")

def format_date(value: str):
    return value.replace("T", " ").replace("Z", "")

In [11]:
date_format = udf(format_date, StringType())
spark_df = spark_df.withColumn("publishedAt", date_format(spark_df["publishedAt"]))

In [12]:
spark_df.select("publishedAt").show(2)

+-------------------+
|        publishedAt|
+-------------------+
|2022-04-01 16:36:48|
|2022-04-01 11:03:23|
+-------------------+
only showing top 2 rows



In [13]:
# Convert date type of column publishedAt to datetime data type
spark_df = spark_df.withColumn("publishedAt", to_timestamp("publishedAt"))

In [14]:
# Convert date type of column categoryId to integer data type
spark_df = spark_df.withColumn("categoryId", spark_df["categoryId"].cast(IntegerType()))

In [15]:
# trending_date replace to format date
spark_df = spark_df.withColumn("trending_date", date_format(spark_df["trending_date"]))

In [16]:
spark_df.select("trending_date").show(2)

+-------------------+
|      trending_date|
+-------------------+
|2022-04-02 00:00:00|
|2022-04-02 00:00:00|
+-------------------+
only showing top 2 rows



In [17]:
# Convert date type of column trending_date to datetime data type
spark_df = spark_df.withColumn("trending_date", to_timestamp("trending_date"))

In [18]:
# Convert date type of column view_count to integer data type
spark_df = spark_df.withColumn("view_count", spark_df["view_count"].cast(IntegerType()))

In [19]:
# Convert date type of column likes to integer data type
spark_df = spark_df.withColumn("likes", spark_df["likes"].cast(IntegerType()))

In [20]:
# Convert date type of column dislikes to integer data type
spark_df = spark_df.withColumn("dislikes", spark_df["dislikes"].cast(IntegerType()))

In [21]:
# Convert date type of column comment_count to integer data type
spark_df = spark_df.withColumn("comment_count", spark_df["comment_count"].cast(IntegerType()))

In [22]:
# thumbnail_link replace from default to maxresdefault
link_convert = udf(replace_str, StringType())
spark_df = spark_df.withColumn("thumbnail_link", link_convert(spark_df["thumbnail_link"]))

In [27]:
spark_df.select("thumbnail_link").collect()[17][0]

'https://i.ytimg.com/vi/EfP1h_3u0Lk/maxresdefault.jpg'

In [28]:
# Check
spark_df.printSchema()

root
 |-- video_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- publishedAt: timestamp (nullable = true)
 |-- channelId: string (nullable = true)
 |-- channelTitle: string (nullable = true)
 |-- categoryId: integer (nullable = true)
 |-- trending_date: timestamp (nullable = true)
 |-- tags: string (nullable = true)
 |-- view_count: integer (nullable = true)
 |-- likes: integer (nullable = true)
 |-- dislikes: integer (nullable = true)
 |-- comment_count: integer (nullable = true)
 |-- thumbnail_link: string (nullable = true)
 |-- comments_disabled: string (nullable = true)
 |-- ratings_disabled: string (nullable = true)



In [None]:
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import round
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, Bucketizer
from pyspark.ml.regression import LinearRegression

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt

%matplotlib inline
import warnings
warnings.filterwarnings('ignore')

# Read Data

In [None]:
# Read data from CSV file
df = spark.read.csv('/content/drive/MyDrive/Colab Notebooks/properties_2016.csv', sep=',', header=True, inferSchema=True, nullValue='NA')

In [None]:
# Get number of records
print(f"Dataset have {df.count()} records")

Dataset have 2985217 records


In [None]:
# Get Shape dataset
print(f"column: {len(df.columns)}\nrow: {df.count()}")

column: 58
row: 2985217


In [None]:
# View five records data
df.show(5)

+--------+---------------------+------------------------+------------+-----------+----------+-------------------+---------------------+-----------------+----------+------------------------+----------------------------+--------------------+--------------------+--------------------+--------------------+-------------------+----+------------+-----------+------------+---------------+--------------+---------------------+--------+----------+-----------------+-------+-----------+------------+-----------+-----------+-------------------------+---------------------+------------------+----------------------+------------+--------------+--------------------+-----------+-------+-----------+-------------------+----------------------+-------+------------------+------------------+---------+---------------+-------------+--------------------------+-----------------+--------------+---------------------+---------+------------------+------------------+-------------------+
|parcelid|airconditioningtypeid|arch

In [None]:
df.printSchema()

root
 |-- parcelid: integer (nullable = true)
 |-- airconditioningtypeid: integer (nullable = true)
 |-- architecturalstyletypeid: integer (nullable = true)
 |-- basementsqft: integer (nullable = true)
 |-- bathroomcnt: double (nullable = true)
 |-- bedroomcnt: double (nullable = true)
 |-- buildingclasstypeid: integer (nullable = true)
 |-- buildingqualitytypeid: integer (nullable = true)
 |-- calculatedbathnbr: double (nullable = true)
 |-- decktypeid: integer (nullable = true)
 |-- finishedfloor1squarefeet: integer (nullable = true)
 |-- calculatedfinishedsquarefeet: double (nullable = true)
 |-- finishedsquarefeet12: integer (nullable = true)
 |-- finishedsquarefeet13: integer (nullable = true)
 |-- finishedsquarefeet15: integer (nullable = true)
 |-- finishedsquarefeet50: integer (nullable = true)
 |-- finishedsquarefeet6: integer (nullable = true)
 |-- fips: integer (nullable = true)
 |-- fireplacecnt: integer (nullable = true)
 |-- fullbathcnt: integer (nullable = true)
 |-- gar

In [None]:
df.describe().show()

+-------+--------------------+---------------------+------------------------+-----------------+------------------+------------------+-------------------+---------------------+------------------+----------+------------------------+----------------------------+--------------------+--------------------+--------------------+--------------------+-------------------+------------------+-------------------+------------------+------------------+------------------+---------------------+-------------------+--------------------+------------------+-------+------------------+------------+-----------+-----------+-------------------------+---------------------+--------------------+----------------------+-----------------+------------------+--------------------+-----------------+------------------+-----------+-------------------+----------------------+------------------+------------------+------------------+------------------+------------------+--------------------------+------------------+------------

# Clean Data

In [None]:
# Drop duplicates
df = df.dropDuplicates()

In [None]:
# Get the missing value of each column
null_counts = df.select([sum(col(column).isNull().cast("int")).alias(column) for column in df.columns])
null_counts.show()

+--------+---------------------+------------------------+------------+-----------+----------+-------------------+---------------------+-----------------+----------+------------------------+----------------------------+--------------------+--------------------+--------------------+--------------------+-------------------+-----+------------+-----------+------------+---------------+--------------+---------------------+--------+---------+-----------------+-------+-----------+------------+-----------+-----------+-------------------------+---------------------+------------------+----------------------+------------+--------------+--------------------+-----------+-------+-----------+-------------------+----------------------+-------+------------------+------------------+---------+---------------+-------------+--------------------------+-----------------+--------------+---------------------+---------+------------------+------------------+-------------------+
|parcelid|airconditioningtypeid|arch

In [None]:
# Visualize missing value on each column
pandas_df = df.toPandas()
missing_count = pandas_df.isna().sum()
sns.barplot(x=missing_count.index, y=missing_count.values)
plt.title('Numbers Missing Value on each column')
plt.xlabel('Column')
plt.ylabel('Numbers Missing')
plt.show()

In [None]:
# Drop columns that are more than 60% missing
def column_dropper(df, threshold):
    total_records = df.count()
    for col in df.columns:
        missing = df.filter(df[col].isNull()).count()
        missing_percent = missing / total_records
        if missing_percent > threshold:
            df = df.drop(col)
    return df

df = column_dropper(df, 0.6)

In [None]:
# columns remaining after deletion
null_counts = df.select([sum(col(column).isNull().cast("int")).alias(column) for column in df.columns])
null_counts.show()

+--------+---------------------+------------------------+------------+-----------+----------+-------------------+---------------------+-----------------+----------+------------------------+----------------------------+--------------------+--------------------+--------------------+--------------------+-------------------+-----+------------+-----------+------------+---------------+--------------+---------------------+--------+---------+-----------------+-------+-----------+------------+-----------+-----------+-------------------------+---------------------+------------------+----------------------+------------+--------------+--------------------+-----------+-------+-----------+-------------------+----------------------+-------+------------------+------------------+---------+---------------+-------------+--------------------------+-----------------+--------------+---------------------+---------+------------------+------------------+-------------------+
|parcelid|airconditioningtypeid|arch

In [None]:
df.show()

In [None]:
PARCELID: 0
BATHROOMCNT: 11462
BEDROOMCNT: 11450
BUILDINGQUALITYTYPEID: 1046729
CALCULATEDBATHNBR: 128912
CALCULATEDFINISHEDSQUAREFEET: 55565
FINISHEDSQUAREFEET12: 276033
FIPS: 11437
FULLBATHCNT: 128912
HEATINGORSYSTEMTYPEID: 1178816
LATITUDE: 11437
LONGITUDE: 11437
LOTSIZESQUAREFEET: 276099
PROPERTYCOUNTYLANDUSECODE: 12277
PROPERTYLANDUSETYPEID: 11437
PROPERTYZONINGDESC: 1006588
RAWCENSUSTRACTANDBLOCK: 11437
REGIONIDCITY: 62845
REGIONIDCOUNTY: 11437
REGIONIDZIP: 13980
ROOMCNT: 11475
UNITCNT: 1007727
YEARBUILT: 59928
STRUCTURETAXVALUEDOLLARCNT: 54982
TAXVALUEDOLLARCNT: 42550
ASSESSMENTYEAR: 11439
LANDTAXVALUEDOLLARCNT: 67733
TAXAMOUNT: 31250
CENSUSTRACTANDBLOCK: 75126

In [None]:
# Fill miss value
values = {
    'bathroomcnt': 'value1',
    'bedroomcnt': 'value2',
    'buildingqualitytypeid': "",
    "CALCULATEDBATHNBR": 128912,
    "CALCULATEDFINISHEDSQUAREFEET": 55565,
    "FINISHEDSQUAREFEET12": 276033,
    "FIPS": 11437,
    "FULLBATHCNT": 128912,
    "HEATINGORSYSTEMTYPEID": 1178816,
    "LATITUDE": 11437,
    "LONGITUDE": 11437,
    "LOTSIZESQUAREFEET": 276099,
    "PROPERTYCOUNTYLANDUSECODE": 12277,
    "PROPERTYLANDUSETYPEID": 11437,
    "PROPERTYZONINGDESC": 1006588,
    "RAWCENSUSTRACTANDBLOCK": 11437,
    "REGIONIDCITY": 62845,
    "REGIONIDCOUNTY": 11437,
    "REGIONIDZIP": 13980,
    "ROOMCNT": 11475,
    "UNITCNT": 1007727,
    "YEARBUILT": 59928,
    "STRUCTURETAXVALUEDOLLARCNT": 54982,
    "TAXVALUEDOLLARCNT": 42550,
    "ASSESSMENTYEAR": 11439,
    "LANDTAXVALUEDOLLARCNT": 67733,
    "TAXAMOUNT": 31250,
    "CENSUSTRACTANDBLOCK": 75126
}
filled_df = df.fillna(values)

58

In [None]:
null_counts = df.select([sum(col(column).isNull().cast("int")).alias(column) for column in df.columns])
null_counts.show()

# Feature Engineering

In [None]:
# One-hot encoding for 'bathroomcnt'
encoder_bathroomcnt = OneHotEncoder(inputCols=['bathroomcnt'], outputCols=['bathroomcnt_dummy'])
df = encoder_bathroomcnt.fit(df).transform(df)


In [None]:
# One-hot encoding for 'bedroomcnt'
encoder_bedroomcnt = OneHotEncoder(inputCols=['bedroomcnt'], outputCols=['bedroomcnt_dummy'])
df = encoder_bedroomcnt.fit(df).transform(df)

In [None]:
# Assemble features into a single vector column
assembler = VectorAssembler(inputCols=['roomcnt', 'latitude', 'longitude', 'bathroomcnt_dummy', 'bedroomcnt_dummy'], outputCol='features')
df = assembler.transform(df)

# Build Linear Regression Model

In [None]:
# Split the data
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)

In [None]:
# Build the model
regression = LinearRegression(featuresCol='features', labelCol='duration')
model = regression.fit(train_data)

In [None]:
# Make predictions
predictions = model.transform(test_data)

# Evaluate Model

In [None]:
# Evaluate the model
evaluator = RegressionEvaluator(labelCol='duration', metricName='rmse')
rmse = evaluator.evaluate(predictions)
print("Root Mean Square Error (RMSE) on test data =", rmse)

# Print coefficients and intercept for interpretation
print("Coefficients:", model.coefficients)
print("Intercept:", model.intercept)

# Pre-Processing Data

### 1. Check Data

In [None]:
df = spark.read.csv("properties_2016.csv", header=True, inferSchema=True)

In [None]:
df.limit(10)

In [None]:
# convert all column names to uppercase
for col in df.columns:
  df = df.withColumnRenamed(col, col.upper())

### 2. check descriptive statistics

In [None]:
df.describe().limit(20)

### 3. Check DataType

In [None]:
df.printSchema()

### 4. Check Number columns, rows current

In [None]:
# Columns
len(df.columns)

In [None]:
# Rows
df.count()

### 5. Drop Duplicates

In [None]:
df = df.dropDuplicates()

### 6. Check Miss Value

In [None]:
def check_null_count():
    for column in df.columns:
        null_count = df.filter(df[column].isNull()).count()
        print(f"{column}: {null_count}")

In [None]:
# Số lượng giá trị khuyết thiếu của từng cột
check_null_count()

##### Drop columns with more than 60% missing

In [None]:
def column_dropper(df, threshold):
    # Takes a dataframe and threshold for missing values. Returns a dataframe.
    total_records = df.count()
    for col in df.columns:
        # Calculate the percentage of missing values
        missing = df.where(df[col].isNull()).count()
        missing_percent = missing / total_records
        # Drop column if percent of missing is more than threshold
        if missing_percent > threshold:
            df = df.drop(col)
    return df

# Drop columns that are more than 60% missing
df = column_dropper(df, 0.6)

In [None]:
check_null_count()

### 7. Outlier Filtering

In [None]:
mean_val = df.agg({'BATHROOMCNT': 'mean'}).collect()[0][0]
stddev_val = df.agg({'BATHROOMCNT': 'stddev'}).collect()[0][0]

low_bound = mean_val - (3 * stddev_val)
hi_bound = mean_val + (3 * stddev_val)

df = df.where((df['BATHROOMCNT'] < hi_bound) & (df['BATHROOMCNT'] > low_bound))

### 8. Adjust Data

In [None]:
mean = df.agg({'BATHROOMCNT': 'mean'}).collect()[0][0]
stddev = df.agg({'BATHROOMCNT': 'stddev'}).collect()[0][0]
# Create a new column with the scaled data
df = df.withColumn("ztrans_days", (df['BATHROOMCNT'] - mean) / stddev)
df.agg({'ztrans_days': 'mean'}).collect()
df.agg({'ztrans_days': 'stddev'}).collect()

# Feature Engineering

### 1. Bucketing

In [None]:
splits = [0, 1, 2, 3, 4, float('Inf')]

# Create bucketing transformer
buck = Bucketizer(splits=splits, inputCol='TAXAMOUNT', outputCol='TAXA')

# Apply transformer
df = buck.transform(df)

# Inspect results
df[['TAXAMOUNT', 'TAXA']].show()

### 2. One-hot Encoding

In [None]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer

# Map strings to numbers with string indexer
string_indexer = StringIndexer(inputCol='ROOMCNT', outputCol='ROOM_Index')
indexed_df = string_indexer.fit(df).transform(df)

# Onehot encode indexed values
encoder = OneHotEncoder(inputCol='ROOM_Index', outputCol='ROOM_Vec')
encoded_df = encoder.fit(indexed_df).transform(indexed_df)

# Inspect the transformation steps
encoded_df[['ROOMCNT', 'ROOM_Index', 'ROOM_Vec']].show(truncate=100)