# v1

In [6]:
from pyspark.sql import SparkSession

# Initialize SparkSession with appropriate memory configuration
spark = SparkSession.builder \
    .appName("Large Parquet File Analysis") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "8g") \
    .config("spark.executor.cores", "4") \
    .config("spark.sql.files.maxPartitionBytes", "134217728") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/13 12:43:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [42]:
# Path to your parquet file
file_path = "data/all_reviews/1m_sample.parquet"

# Read the parquet file
df = spark.read.parquet(file_path)

# Display the schema
df.printSchema()



root
 |-- recommendationid: string (nullable = true)
 |-- appid: string (nullable = true)
 |-- game: string (nullable = true)
 |-- author_steamid: string (nullable = true)
 |-- author_num_games_owned: string (nullable = true)
 |-- author_num_reviews: string (nullable = true)
 |-- author_playtime_forever: string (nullable = true)
 |-- author_playtime_last_two_weeks: string (nullable = true)
 |-- author_playtime_at_review: string (nullable = true)
 |-- author_last_played: string (nullable = true)
 |-- language: string (nullable = true)
 |-- review: string (nullable = true)
 |-- timestamp_created: string (nullable = true)
 |-- timestamp_updated: string (nullable = true)
 |-- voted_up: string (nullable = true)
 |-- votes_up: string (nullable = true)
 |-- votes_funny: string (nullable = true)
 |-- weighted_vote_score: string (nullable = true)
 |-- comment_count: string (nullable = true)
 |-- steam_purchase: string (nullable = true)
 |-- received_for_free: string (nullable = true)
 |-- writt

In [43]:
# Show a sample of the data (first 10 rows)
df.head(1)

                                                                                

[Row(recommendationid='148571515', appid='10', game='Counter-Strike', author_steamid='76561198334618026', author_num_games_owned='61', author_num_reviews='9', author_playtime_forever='2000', author_playtime_last_two_weeks='2000', author_playtime_at_review='733', author_last_played='1698334477', language='russian', review='игра на века', timestamp_created='1697847862', timestamp_updated='1697847862', voted_up='1', votes_up='0', votes_funny='0', weighted_vote_score='0.0', comment_count='0', steam_purchase='1', received_for_free='0', written_during_early_access='0')]

In [44]:
from pyspark.sql.functions import col, rand, when, isnan, length

def show_null_or_zero_samples(dataframe, column_name, sample_size=10):
    """
    Display random samples of records where the specified column is null or zero.
    
    Parameters:
    dataframe (DataFrame): The PySpark DataFrame to analyze
    column_name (str): The name of the column to check for null/zero values
    sample_size (int): Number of sample records to display (default: 10)
    """
    # Check if column exists in the dataframe
    if column_name not in dataframe.columns:
        print(f"Column '{column_name}' not found in the dataframe.")
        print(f"Available columns: {dataframe.columns}")
        return
    
    # Get the data type of the column
    column_type = dict(dataframe.dtypes)[column_name]
    
    # Create appropriate filter condition based on column type
    if column_type in ['int', 'bigint', 'double', 'float', 'decimal']:
        # For numeric columns, check for nulls, NaN, or zero
        filter_condition = (col(column_name).isNull() | 
                           (when(column_type in ['double', 'float'], isnan(col(column_name))).otherwise(False)) | 
                           (col(column_name) == 0))
    elif column_type in ['boolean']:
        # For boolean columns, check for nulls or False
        filter_condition = (col(column_name).isNull() | (col(column_name) == False))
    elif column_type in ['timestamp', 'date']:
        # For timestamp/date columns, check for nulls only
        filter_condition = col(column_name).isNull()
    elif column_type == 'string':
        # For string columns, check for nulls, empty strings, or whitespace-only strings
        filter_condition = (col(column_name).isNull() | 
                           (col(column_name) == "") | 
                           (length(trim(col(column_name))) == 0))
    else:
        # For other types, check for nulls only
        filter_condition = col(column_name).isNull()
    
    # Filter the dataframe and get random samples
    filtered_df = dataframe.filter(filter_condition)
    
    # Count the number of matching records
    count = filtered_df.count()
    
    if count == 0:
        print(f"No records found where '{column_name}' is null or zero.")
        return
    
    # Order by random to get random samples
    samples = filtered_df.orderBy(rand()).limit(sample_size)
    
    print(f"Found {count} records where '{column_name}' is null or zero.")
    print(f"Showing {min(sample_size, count)} random samples:")
    samples.show(sample_size, truncate=False)
    
    # Return the filtered dataframe in case further analysis is needed
    return filtered_df



In [33]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType, LongType, BooleanType, FloatType, TimestampType



In [34]:
# Assuming df is your existing DataFrame
df_processed = df \
    .withColumn("appid", col("appid").cast(IntegerType())) \
    .withColumn("author_num_games_owned", col("author_num_games_owned").cast(IntegerType())) \
    .withColumn("author_num_reviews", col("author_num_reviews").cast(IntegerType())) \
    .withColumn("author_playtime_forever", col("author_playtime_forever").cast(IntegerType())) \
    .withColumn("author_playtime_last_two_weeks", col("author_playtime_last_two_weeks").cast(IntegerType())) \
    .withColumn("author_playtime_at_review", col("author_playtime_at_review").cast(IntegerType())) \
    .withColumn("author_last_played", col("author_last_played").cast(LongType())) \
    .withColumn("votes_up", col("votes_up").cast(IntegerType())) \
    .withColumn("votes_funny", col("votes_funny").cast(IntegerType())) \
    .withColumn("weighted_vote_score", col("weighted_vote_score").cast(FloatType())) \
    .withColumn("comment_count", col("comment_count").cast(IntegerType())) \
    .withColumn("voted_up", col("voted_up").cast(BooleanType())) \
    .withColumn("steam_purchase", col("steam_purchase").cast(BooleanType())) \
    .withColumn("received_for_free", col("received_for_free").cast(BooleanType())) \
    .withColumn("written_during_early_access", col("written_during_early_access").cast(BooleanType()))


    # .withColumn("timestamp_created", col("timestamp_created").cast(TimestampType())) \
    # .withColumn("timestamp_updated", col("timestamp_updated").cast(TimestampType()))


In [35]:
df_processed.printSchema()


root
 |-- recommendationid: string (nullable = true)
 |-- appid: integer (nullable = true)
 |-- game: string (nullable = true)
 |-- author_steamid: string (nullable = true)
 |-- author_num_games_owned: integer (nullable = true)
 |-- author_num_reviews: integer (nullable = true)
 |-- author_playtime_forever: integer (nullable = true)
 |-- author_playtime_last_two_weeks: integer (nullable = true)
 |-- author_playtime_at_review: integer (nullable = true)
 |-- author_last_played: long (nullable = true)
 |-- language: string (nullable = true)
 |-- review: string (nullable = true)
 |-- timestamp_created: string (nullable = true)
 |-- timestamp_updated: string (nullable = true)
 |-- voted_up: boolean (nullable = true)
 |-- votes_up: integer (nullable = true)
 |-- votes_funny: integer (nullable = true)
 |-- weighted_vote_score: float (nullable = true)
 |-- comment_count: integer (nullable = true)
 |-- steam_purchase: boolean (nullable = true)
 |-- received_for_free: boolean (nullable = true)
 

In [None]:
show_null_or_zero_samples(df_processed, "game")


In [36]:
from pyspark.sql.functions import count, when, isnan, col

# Get the data types of each column
column_types = dict(df_processed.dtypes)

# Create a list to store expressions for counting nulls
null_count_expressions = []

for c in df_processed.columns:
    # For numeric columns (double/float), check both null and NaN
    if column_types[c] in ['integer', 'double', 'float']:
        null_count_expressions.append(count(when(col(c).isNull() | isnan(c), c)).alias(c))
    # For other types, only check for null
    else:
        null_count_expressions.append(count(when(col(c).isNull(), c)).alias(c))

# Apply the expressions to the DataFrame
null_counts = df_processed.select(null_count_expressions)


In [37]:
null_counts.head(50)


                                                                                

[Row(recommendationid=0, appid=17, game=51, author_steamid=14, author_num_games_owned=20, author_num_reviews=19, author_playtime_forever=16, author_playtime_last_two_weeks=16, author_playtime_at_review=16, author_last_played=16, language=15, review=15, timestamp_created=214, timestamp_updated=226, voted_up=25962, votes_up=15369, votes_funny=13523, weighted_vote_score=12022, comment_count=10759, steam_purchase=16461, received_for_free=14143, written_during_early_access=12414)]

In [40]:
# Example usage:
show_null_or_zero_samples(df_processed, "game")


Found 51 records where 'game' is null or zero.
Showing 10 random samples:




+-------------------+-------+----+-----------------+----------------------+------------------+-----------------------+------------------------------+-------------------------+------------------+--------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------

                                                                                

DataFrame[recommendationid: string, appid: int, game: string, author_steamid: string, author_num_games_owned: int, author_num_reviews: int, author_playtime_forever: int, author_playtime_last_two_weeks: int, author_playtime_at_review: int, author_last_played: bigint, language: string, review: string, timestamp_created: string, timestamp_updated: string, voted_up: boolean, votes_up: int, votes_funny: int, weighted_vote_score: float, comment_count: int, steam_purchase: boolean, received_for_free: boolean, written_during_early_access: boolean]

# v2

In [1]:
from pyspark.sql import SparkSession

# Initialize SparkSession with appropriate memory configuration
spark = SparkSession.builder \
    .appName("Large Parquet File Analysis") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "8g") \
    .config("spark.executor.cores", "4") \
    .config("spark.sql.files.maxPartitionBytes", "134217728") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/13 15:27:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
import pandas as pd
import numpy as np
import random
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, isnan, count, rand, trim, length
from pyspark.sql.types import IntegerType, LongType, BooleanType, FloatType

# 1. Load data and print schema
def load_data(file_path):
    """
    Load the parquet file and print its schema
    """    
    # Load the dataset
    df = spark.read.parquet(file_path)
    
    # Print the schema
    print("Original Schema:")
    df.printSchema()
    
    return df

file_path = "data/all_reviews/1m_sample.parquet"

# Read the parquet file
df = load_data(file_path)



                                                                                

Original Schema:
root
 |-- recommendationid: string (nullable = true)
 |-- appid: string (nullable = true)
 |-- game: string (nullable = true)
 |-- author_steamid: string (nullable = true)
 |-- author_num_games_owned: string (nullable = true)
 |-- author_num_reviews: string (nullable = true)
 |-- author_playtime_forever: string (nullable = true)
 |-- author_playtime_last_two_weeks: string (nullable = true)
 |-- author_playtime_at_review: string (nullable = true)
 |-- author_last_played: string (nullable = true)
 |-- language: string (nullable = true)
 |-- review: string (nullable = true)
 |-- timestamp_created: string (nullable = true)
 |-- timestamp_updated: string (nullable = true)
 |-- voted_up: string (nullable = true)
 |-- votes_up: string (nullable = true)
 |-- votes_funny: string (nullable = true)
 |-- weighted_vote_score: string (nullable = true)
 |-- comment_count: string (nullable = true)
 |-- steam_purchase: string (nullable = true)
 |-- received_for_free: string (nullable =

In [37]:

# 2. Check for null values
def check_null_values(df):
    """
    Check for null values in all columns and print the count
    """
    # Create expressions for counting nulls in each column
    null_count_expressions = []
    
    for c in df.columns:
        column_type = dict(df.dtypes)[c]
        
        # For numeric columns (int, double, float), check both null and NaN
        if column_type in ['int', 'bigint', 'double', 'float']:
            null_count_expressions.append(count(when(col(c).isNull() | isnan(c), c)).alias(c))
        # For string columns, check for nulls and empty strings
        elif column_type == 'string':
            null_count_expressions.append(count(when(col(c).isNull() | (col(c) == ""), c)).alias(c))
        # For other types, only check for null
        else:
            null_count_expressions.append(count(when(col(c).isNull(), c)).alias(c))
    
    # Apply the expressions to the DataFrame
    null_counts = df.select(null_count_expressions)
    
    # print("Null value counts for each column:")
    # null_counts.show(truncate=False)
    
    return null_counts


null_counts = check_null_values(df)

In [38]:
null_counts.head(23)

                                                                                

[Row(recommendationid=0, appid=11, game=51, author_steamid=14, author_num_games_owned=14, author_num_reviews=14, author_playtime_forever=14, author_playtime_last_two_weeks=14, author_playtime_at_review=14, author_last_played=15, language=15, review=15, timestamp_created=214, timestamp_updated=226, voted_up=233, votes_up=201, votes_funny=230, weighted_vote_score=227, comment_count=200, steam_purchase=212, received_for_free=222, written_during_early_access=211)]

In [40]:
# 3. Convert data types
from pyspark.sql.functions import col, from_unixtime, to_timestamp
from pyspark.sql.types import IntegerType, FloatType, BooleanType, TimestampType

def convert_data_types(df):
    """
    Convert columns to appropriate data types
    """
    df_processed = df \
        .withColumn("author_num_games_owned", col("author_num_games_owned").cast(IntegerType())) \
        .withColumn("author_num_reviews", col("author_num_reviews").cast(IntegerType())) \
        .withColumn("author_playtime_forever", col("author_playtime_forever").cast(IntegerType())) \
        .withColumn("author_playtime_at_review", col("author_playtime_at_review").cast(IntegerType())) \
        .withColumn("votes_up", col("votes_up").cast(IntegerType())) \
        .withColumn("votes_funny", col("votes_funny").cast(IntegerType())) \
        .withColumn("weighted_vote_score", col("weighted_vote_score").cast(FloatType())) \
        .withColumn("comment_count", col("comment_count").cast(IntegerType())) \
        .withColumn("steam_purchase", col("steam_purchase").cast(BooleanType())) \
        .withColumn("received_for_free", col("received_for_free").cast(BooleanType())) \
        .withColumn("written_during_early_access", col("written_during_early_access").cast(BooleanType()))
    
    # Convert Unix timestamps to proper datetime format
    df_processed = df_processed \
        .withColumn("author_last_played", to_timestamp(from_unixtime(col("author_last_played").cast("long")))) \
        .withColumn("timestamp_created", to_timestamp(from_unixtime(col("timestamp_created").cast("long")))) \
        .withColumn("timestamp_updated", to_timestamp(from_unixtime(col("timestamp_updated").cast("long"))))
    
    print("\nConverted Schema:")
    df_processed.printSchema()
    
    return df_processed



df_dtypes_changed = convert_data_types(df)


Converted Schema:
root
 |-- recommendationid: string (nullable = true)
 |-- appid: string (nullable = true)
 |-- game: string (nullable = true)
 |-- author_steamid: string (nullable = true)
 |-- author_num_games_owned: integer (nullable = true)
 |-- author_num_reviews: integer (nullable = true)
 |-- author_playtime_forever: integer (nullable = true)
 |-- author_playtime_last_two_weeks: string (nullable = true)
 |-- author_playtime_at_review: integer (nullable = true)
 |-- author_last_played: timestamp (nullable = true)
 |-- language: string (nullable = true)
 |-- review: string (nullable = true)
 |-- timestamp_created: timestamp (nullable = true)
 |-- timestamp_updated: timestamp (nullable = true)
 |-- voted_up: string (nullable = true)
 |-- votes_up: integer (nullable = true)
 |-- votes_funny: integer (nullable = true)
 |-- weighted_vote_score: float (nullable = true)
 |-- comment_count: integer (nullable = true)
 |-- steam_purchase: boolean (nullable = true)
 |-- received_for_free: 

In [41]:
null_counts = check_null_values(df_dtypes_changed)
null_counts.head(50)

                                                                                

[Row(recommendationid=0, appid=11, game=51, author_steamid=14, author_num_games_owned=20, author_num_reviews=19, author_playtime_forever=16, author_playtime_last_two_weeks=14, author_playtime_at_review=16, author_last_played=16, language=15, review=15, timestamp_created=25949, timestamp_updated=21078, voted_up=233, votes_up=15369, votes_funny=13523, weighted_vote_score=12022, comment_count=10759, steam_purchase=16461, received_for_free=14143, written_during_early_access=12414)]

In [42]:
df_dtypes_changed.show(100)

+----------------+-----+--------------+-----------------+----------------------+------------------+-----------------------+------------------------------+-------------------------+-------------------+----------+------------------------------------+-------------------+-------------------+----------+--------+-----------+-------------------+-------------+--------------+-----------------+---------------------------+
|recommendationid|appid|          game|   author_steamid|author_num_games_owned|author_num_reviews|author_playtime_forever|author_playtime_last_two_weeks|author_playtime_at_review| author_last_played|  language|                              review|  timestamp_created|  timestamp_updated|  voted_up|votes_up|votes_funny|weighted_vote_score|comment_count|steam_purchase|received_for_free|written_during_early_access|
+----------------+-----+--------------+-----------------+----------------------+------------------+-----------------------+------------------------------+--------------

In [7]:
from pyspark.sql.functions import col, countDistinct, desc, isnan, when, count

def analyze_string_columns_for_conversion(df, columns):
    """
    Analyze string columns before conversion to identify potential issues
    
    Parameters:
    df (DataFrame): The PySpark DataFrame to analyze
    columns (list): List of column names to analyze
    """
    for column in columns:
        print(f"\n{'='*80}")
        print(f"ANALYSIS FOR COLUMN: {column}")
        print(f"{'='*80}")
        
        # 1. Count total unique values
        unique_count = df.select(countDistinct(col(column))).collect()[0][0]
        print(f"Total unique values: {unique_count}")
        
        # 2. Count null or empty strings
        null_empty_count = df.filter((col(column).isNull()) | (col(column) == "") | (col(column).rlike('^\\s*$'))).count()
        print(f"Count of null or empty strings: {null_empty_count}")
        
        # 3. Show sample unique values (up to 20)
        unique_values = df.select(column).distinct().limit(20).rdd.flatMap(lambda x: x).collect()
        print(f"Sample unique values (up to 20): {unique_values}")
        
        # 4. Show top 10 most frequent values
        print("Top 10 most frequent values:")
        df.groupBy(column).count().orderBy(desc("count")).limit(20).show(truncate=False)
        
        # 5. Check for non-convertible values based on target data type
        if column in ["voted_up", "steam_purchase", "received_for_free", "written_during_early_access"]:
            # Boolean columns - check for values other than 0, 1, true, false, etc.
            non_bool_values = df.filter(
                ~col(column).isNull() & 
                ~col(column).rlike("^(0|1|true|false|t|f|yes|no|y|n)$")
            )
            non_bool_count = non_bool_values.count()
            print(f"Values that may not convert to boolean: {non_bool_count}")
            if non_bool_count > 0:
                print("Sample problematic values:")
                non_bool_values.select(column).distinct().limit(10).show(truncate=False)
                
        elif column in ["votes_up", "votes_funny", "comment_count"]:
            # Integer columns - check for non-numeric values
            non_int_values = df.filter(
                ~col(column).isNull() & 
                ~col(column).rlike("^-?\\d+$")
            )
            non_int_count = non_int_values.count()
            print(f"Values that may not convert to integer: {non_int_count}")
            if non_int_count > 0:
                print("Sample problematic values:")
                non_int_values.select(column).distinct().limit(10).show(truncate=False)
                
        elif column == "weighted_vote_score":
            # Float column - check for non-numeric values
            non_float_values = df.filter(
                ~col(column).isNull() & 
                ~col(column).rlike("^-?\\d+(\\.\\d*)?$")
            )
            non_float_count = non_float_values.count()
            print(f"Values that may not convert to float: {non_float_count}")
            if non_float_count > 0:
                print("Sample problematic values:")
                non_float_values.select(column).distinct().limit(10).show(truncate=False)
        
        # 6. Check for leading/trailing whitespace
        whitespace_values = df.filter(
            ~col(column).isNull() & 
            ((col(column) != trim(col(column))))
        )
        whitespace_count = whitespace_values.count()
        print(f"Values with leading/trailing whitespace: {whitespace_count}")
        
        # 7. Check for special characters that might cause issues
        special_char_values = df.filter(
            ~col(column).isNull() & 
            col(column).rlike("[^a-zA-Z0-9\\s\\.-]")
        )
        special_char_count = special_char_values.count()
        print(f"Values with special characters: {special_char_count}")



In [8]:
# 5. Drop function
def drop_null_records(df, column_name):
    """
    Drop records where the specified column has null values
    """
    # Check if column exists
    if column_name not in df.columns:
        print(f"Column '{column_name}' not found in the dataframe.")
        return df
    
    # Count before dropping
    count_before = df.count()
    
    # Drop null records for the specified column
    column_type = dict(df.dtypes)[column_name]
    
    if column_type in ['double', 'float']:
        df_cleaned = df.filter(~(col(column_name).isNull() | isnan(col(column_name))))
    elif column_type == 'string':
        df_cleaned = df.filter(~(col(column_name).isNull() | (col(column_name) == "")))
    else:
        df_cleaned = df.filter(col(column_name).isNotNull())
    
    # Count after dropping
    count_after = df_cleaned.count()
    
    print(f"Dropped {count_before - count_after} records where '{column_name}' is null.")
    print(f"Records before: {count_before}, Records after: {count_after}")
    
    return df_cleaned



In [45]:
null_counts = check_null_values(df_dtypes_changed)
null_counts.head(50)

                                                                                

[Row(recommendationid=0, appid=11, game=51, author_steamid=14, author_num_games_owned=20, author_num_reviews=19, author_playtime_forever=16, author_playtime_last_two_weeks=14, author_playtime_at_review=16, author_last_played=16, language=15, review=15, timestamp_created=25949, timestamp_updated=21078, voted_up=233, votes_up=15369, votes_funny=13523, weighted_vote_score=12022, comment_count=10759, steam_purchase=16461, received_for_free=14143, written_during_early_access=12414)]

### Here we do ANALYSIS

In [43]:
columns_to_analyze = [
    "game"
]
analyze_string_columns_for_conversion(df, columns_to_analyze)



ANALYSIS FOR COLUMN: game


                                                                                

Total unique values: 31933


                                                                                

Count of null or empty strings: 51
Sample unique values (up to 20): ['Warhammer: Chaosbane - Gold Boost', 'Smile For Me', 'Warhammer 40,000: Inquisitor - Prophecy', 'Combate Monero', 'KATANA KAMI: A Way of the Samurai Story ', 'Just Cause™ 4: Sea Dogs Vehicle Pack', 'Survive on Raft', '100% Orange Juice - Sora & Sham (Cuties) Character Pack', 'Sweet Dream Succubus - Nightmare Edition', 'Eleanor 3', 'The Legend of Bum-Bo', 'EEP 16 Expert', 'Atelier Ayesha: The Alchemist of Dusk DX', 'Kingdom Shell', 'Unlock The King', 'Cats are Liquid - A Better Place', 'Growing Up', 'Making Lovers', 'Music Visualizer Engine PC Live Wallpaper', 'Coromon']
Top 10 most frequent values:
+------------------------------+-----+
|game                          |count|
+------------------------------+-----+
|Counter-Strike 2              |67531|
|PUBG: BATTLEGROUNDS           |19717|
|Grand Theft Auto V            |14566|
|Terraria                      |10541|
|Tom Clancy's Rainbow Six Siege|10484|
|Garry's Mod 



Values with special characters: 297660


                                                                                

In [44]:
columns_to_analyze = [
    "game"
]
analyze_string_columns_for_conversion(df_dtypes_changed, columns_to_analyze)



ANALYSIS FOR COLUMN: game
Total unique values: 31933
Count of null or empty strings: 51
Sample unique values (up to 20): ['Warhammer: Chaosbane - Gold Boost', 'Smile For Me', 'Warhammer 40,000: Inquisitor - Prophecy', 'Combate Monero', 'KATANA KAMI: A Way of the Samurai Story ', 'Just Cause™ 4: Sea Dogs Vehicle Pack', 'Survive on Raft', '100% Orange Juice - Sora & Sham (Cuties) Character Pack', 'Sweet Dream Succubus - Nightmare Edition', 'Eleanor 3', 'The Legend of Bum-Bo', 'EEP 16 Expert', 'Atelier Ayesha: The Alchemist of Dusk DX', 'Kingdom Shell', 'Unlock The King', 'Cats are Liquid - A Better Place', 'Growing Up', 'Making Lovers', 'Music Visualizer Engine PC Live Wallpaper', 'Coromon']
Top 10 most frequent values:
+------------------------------+-----+
|game                          |count|
+------------------------------+-----+
|Counter-Strike 2              |67531|
|PUBG: BATTLEGROUNDS           |19717|
|Grand Theft Auto V            |14566|
|Terraria                      |10541



Values with special characters: 297660


                                                                                

In [142]:
# column_to_check = "game"  # Change this to any column you want to check
# print(f"\nShowing null samples for column: {column_to_check}")
# null_samples = show_null_samples(df, column_to_check)


### HERE WE DROP THE NULL VALUES FROM A COLUMN

In [23]:
print("\n--- STEP 5: Dropping null records ---")
column_to_drop = "votes_funny"  # Change this to the column you want to use for dropping
df_cleaned = drop_null_records(df_cleaned, column_to_drop)


--- STEP 5: Dropping null records ---




Dropped 20 records where 'votes_funny' is null.
Records before: 978000, Records after: 977980


                                                                                

In [24]:
null_counts = check_null_values(df_cleaned)
null_counts.head(50)

                                                                                

[Row(recommendationid=0, appid=0, game=0, author_steamid=0, author_num_games_owned=1, author_num_reviews=0, author_playtime_forever=0, author_playtime_last_two_weeks=0, author_playtime_at_review=0, author_last_played=0, language=0, review=0, timestamp_created=0, timestamp_updated=0, voted_up=0, votes_up=0, votes_funny=0, weighted_vote_score=0, comment_count=0, steam_purchase=0, received_for_free=0, written_during_early_access=0)]

In [36]:
columns_to_analyze = [
    "timestamp_updated"
]

analyze_string_columns_for_conversion(df_cleaned, columns_to_analyze)


ANALYSIS FOR COLUMN: timestamp_updated


                                                                                

Total unique values: 974774


                                                                                

Count of null or empty strings: 0


                                                                                

Sample unique values (up to 20): ['1620637239', '1618384512', '1609248330', '1590837154', '1537554521', '1527858001', '1505153979', '1493203391', '1511550161', '1450358520', '1391697605', '1336281579', '1696846636', '1631052104', '1548089221', '1614515902', '1514234510', '1625240844', '1625234777', '1631964489']
Top 10 most frequent values:


                                                                                

+-----------------+-----+
|timestamp_updated|count|
+-----------------+-----+
|1542902093       |5    |
|1606328919       |3    |
|1498585473       |3    |
|1606336271       |3    |
|1606329820       |3    |
|1637777554       |3    |
|1606378603       |3    |
|1669214816       |3    |
|1638289580       |3    |
|1606335092       |3    |
|1388740303       |3    |
|1561930979       |3    |
|1561803767       |3    |
|1684595050       |3    |
|1576036074       |3    |
|1606344651       |3    |
|1606328502       |3    |
|1511459801       |3    |
|1575130950       |3    |
|1606329879       |3    |
+-----------------+-----+

Values with leading/trailing whitespace: 0




Values with special characters: 0


                                                                                

## TESTING DRIVER CODE

In [46]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_unixtime, to_timestamp, count, when, isnan
from pyspark.sql.types import IntegerType, FloatType, BooleanType, TimestampType

def load_parquet_file(file_path):
    """
    Load the parquet file and return the DataFrame
    """
    # Initialize SparkSession
    spark = SparkSession.builder \
        .appName("SteamReviewsAnalysis") \
        .config("spark.driver.memory", "8g") \
        .getOrCreate()
    
    # Load the dataset
    df = spark.read.parquet(file_path)
    
    print("Original Schema:")
    df.printSchema()
    
    return df, spark

def drop_columns(df, columns_to_drop):
    """
    Drop specified columns if they exist in the DataFrame
    """
    existing_columns = df.columns
    columns_to_drop_existing = [col for col in columns_to_drop if col in existing_columns]
    
    if columns_to_drop_existing:
        print(f"Dropping columns: {columns_to_drop_existing}")
        df = df.drop(*columns_to_drop_existing)
    else:
        print("None of the specified columns exist in the DataFrame")
    
    return df

def convert_data_types(df):
    """
    Convert columns to appropriate data types
    """
    df_processed = df \
        .withColumn("author_num_games_owned", col("author_num_games_owned").cast(IntegerType())) \
        .withColumn("author_num_reviews", col("author_num_reviews").cast(IntegerType())) \
        .withColumn("author_playtime_forever", col("author_playtime_forever").cast(IntegerType())) \
        .withColumn("author_playtime_at_review", col("author_playtime_at_review").cast(IntegerType())) \
        .withColumn("votes_up", col("votes_up").cast(IntegerType())) \
        .withColumn("votes_funny", col("votes_funny").cast(IntegerType())) \
        .withColumn("weighted_vote_score", col("weighted_vote_score").cast(FloatType())) \
        .withColumn("comment_count", col("comment_count").cast(IntegerType())) \
        .withColumn("steam_purchase", col("steam_purchase").cast(BooleanType())) \
        .withColumn("received_for_free", col("received_for_free").cast(BooleanType())) \
        .withColumn("written_during_early_access", col("written_during_early_access").cast(BooleanType()))
    
    # Convert Unix timestamps to proper datetime format
    df_processed = df_processed \
        .withColumn("author_last_played", to_timestamp(from_unixtime(col("author_last_played").cast("long")))) \
        .withColumn("timestamp_created", to_timestamp(from_unixtime(col("timestamp_created").cast("long")))) \
        .withColumn("timestamp_updated", to_timestamp(from_unixtime(col("timestamp_updated").cast("long"))))
    
    print("\nConverted Schema:")
    df_processed.printSchema()
    
    return df_processed

def check_null_values(df):
    """
    Check for null values in all columns and print the count
    """
    # Create expressions for counting nulls in each column
    null_count_expressions = []
    
    for c in df.columns:
        column_type = dict(df.dtypes)[c]
        
        # For numeric columns (int, double, float), check both null and NaN
        if column_type in ['int', 'bigint', 'double', 'float']:
            null_count_expressions.append(count(when(col(c).isNull() | isnan(c), c)).alias(c))
        # For string columns, check for nulls and empty strings
        elif column_type == 'string':
            null_count_expressions.append(count(when(col(c).isNull() | (col(c) == ""), c)).alias(c))
        # For timestamp columns, check for nulls
        elif column_type == 'timestamp':
            null_count_expressions.append(count(when(col(c).isNull(), c)).alias(c))
        # For other types, only check for null
        else:
            null_count_expressions.append(count(when(col(c).isNull(), c)).alias(c))
    
    # Apply the expressions to the DataFrame
    null_counts = df.select(null_count_expressions)
    
    return null_counts

def drop_rows_with_nulls(df):
    """
    Drop rows where at least one value is null or empty string (for string columns) 
    or NaN (for numeric columns)
    """
    # Get the data types of each column
    column_types = dict(df.dtypes)
    
    # Create a filter condition to keep rows where all columns are non-null/non-empty/non-NaN
    filter_condition = None
    
    for c in df.columns:
        column_type = column_types[c]
        
        # For numeric columns (int, double, float), check both null and NaN
        if column_type in ['int', 'bigint', 'double', 'float']:
            condition = ~(col(c).isNull() | isnan(col(c)))
        # For string columns, check for nulls and empty strings
        elif column_type == 'string':
            condition = ~(col(c).isNull() | (col(c) == ""))
        # For timestamp columns, check for nulls
        elif column_type == 'timestamp':
            condition = ~col(c).isNull()
        # For other types, only check for null
        else:
            condition = ~col(c).isNull()
        
        # Combine conditions with AND
        if filter_condition is None:
            filter_condition = condition
        else:
            filter_condition = filter_condition & condition
    
    # Count before dropping
    count_before = df.count()
    
    # Filter the dataframe
    df_cleaned = df.filter(filter_condition)
    
    # Count after dropping
    count_after = df_cleaned.count()
    
    print(f"Dropped {count_before - count_after} rows with null values")
    print(f"Rows before: {count_before}, Rows after: {count_after}")
    
    return df_cleaned

def main(file_path):
    """
    Main function to process the Steam Reviews dataset
    """
    # TASK 1: Load the parquet file
    df, spark = load_parquet_file(file_path)
    
    # TASK 2: Drop specified columns
    columns_to_drop = [
        "author_playtime_last_two_weeks", 
        "recommendationid",
        "appid",
        "voted_up",
        "hidden_in_steam_china",
        "steam_china_location"
    ]
    df = drop_columns(df, columns_to_drop)
    
    # TASK 3: Convert data types
    df = convert_data_types(df)
    
    # TASK 4: Drop rows with null values
    # First, check for null values
    null_counts = check_null_values(df)
    print("Null value counts before dropping rows:")
    null_counts.show(truncate=False)
    
    # Drop rows with null values
    df_cleaned = drop_rows_with_nulls(df)
    
    # Verify no null values remain
    null_counts_after = check_null_values(df_cleaned)
    print("Null value counts after dropping rows:")
    null_counts_after.show(truncate=False)
    
    # Return the cleaned DataFrame
    return df_cleaned

if __name__ == "__main__":
    # Replace with your actual file path
    file_path = "data/all_reviews/1m_sample.parquet"
    
    # Process the dataset
    clean_df = main(file_path)
    



25/05/13 16:17:35 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


Original Schema:
root
 |-- recommendationid: string (nullable = true)
 |-- appid: string (nullable = true)
 |-- game: string (nullable = true)
 |-- author_steamid: string (nullable = true)
 |-- author_num_games_owned: string (nullable = true)
 |-- author_num_reviews: string (nullable = true)
 |-- author_playtime_forever: string (nullable = true)
 |-- author_playtime_last_two_weeks: string (nullable = true)
 |-- author_playtime_at_review: string (nullable = true)
 |-- author_last_played: string (nullable = true)
 |-- language: string (nullable = true)
 |-- review: string (nullable = true)
 |-- timestamp_created: string (nullable = true)
 |-- timestamp_updated: string (nullable = true)
 |-- voted_up: string (nullable = true)
 |-- votes_up: string (nullable = true)
 |-- votes_funny: string (nullable = true)
 |-- weighted_vote_score: string (nullable = true)
 |-- comment_count: string (nullable = true)
 |-- steam_purchase: string (nullable = true)
 |-- received_for_free: string (nullable =

                                                                                

+----+--------------+----------------------+------------------+-----------------------+-------------------------+------------------+--------+------+-----------------+-----------------+--------+-----------+-------------------+-------------+--------------+-----------------+---------------------------+
|game|author_steamid|author_num_games_owned|author_num_reviews|author_playtime_forever|author_playtime_at_review|author_last_played|language|review|timestamp_created|timestamp_updated|votes_up|votes_funny|weighted_vote_score|comment_count|steam_purchase|received_for_free|written_during_early_access|
+----+--------------+----------------------+------------------+-----------------------+-------------------------+------------------+--------+------+-----------------+-----------------+--------+-----------+-------------------+-------------+--------------+-----------------+---------------------------+
|51  |14            |20                    |19                |16                     |16        

                                                                                

Dropped 26025 rows with null values
Rows before: 999365, Rows after: 973340
Null value counts after dropping rows:


                                                                                

+----+--------------+----------------------+------------------+-----------------------+-------------------------+------------------+--------+------+-----------------+-----------------+--------+-----------+-------------------+-------------+--------------+-----------------+---------------------------+
|game|author_steamid|author_num_games_owned|author_num_reviews|author_playtime_forever|author_playtime_at_review|author_last_played|language|review|timestamp_created|timestamp_updated|votes_up|votes_funny|weighted_vote_score|comment_count|steam_purchase|received_for_free|written_during_early_access|
+----+--------------+----------------------+------------------+-----------------------+-------------------------+------------------+--------+------+-----------------+-----------------+--------+-----------+-------------------+-------------+--------------+-----------------+---------------------------+
|0   |0             |0                     |0                 |0                      |0         

In [47]:
null_counts = check_null_values(clean_df)
null_counts.head(50)

                                                                                

[Row(game=0, author_steamid=0, author_num_games_owned=0, author_num_reviews=0, author_playtime_forever=0, author_playtime_at_review=0, author_last_played=0, language=0, review=0, timestamp_created=0, timestamp_updated=0, votes_up=0, votes_funny=0, weighted_vote_score=0, comment_count=0, steam_purchase=0, received_for_free=0, written_during_early_access=0)]

In [48]:
column_to_drop = "game"  # Change this to the column you want to use for dropping
clean_df = drop_null_records(clean_df, column_to_drop)



Dropped 0 records where 'game' is null.
Records before: 973340, Records after: 973340


                                                                                

In [50]:
columns_to_analyze = [
    "timestamp_created"
]

analyze_string_columns_for_conversion(clean_df, columns_to_analyze)


ANALYSIS FOR COLUMN: timestamp_created


                                                                                

Total unique values: 970339


                                                                                

Count of null or empty strings: 0


                                                                                

Sample unique values (up to 20): [datetime.datetime(2022, 3, 24, 15, 55, 47), datetime.datetime(2021, 5, 28, 14, 21, 2), datetime.datetime(2020, 6, 12, 16, 52, 50), datetime.datetime(2019, 12, 29, 12, 40, 22), datetime.datetime(2018, 12, 9, 4, 43, 25), datetime.datetime(2018, 7, 27, 20, 14, 36), datetime.datetime(2017, 5, 5, 20, 17, 41), datetime.datetime(2016, 12, 17, 22, 29, 47), datetime.datetime(2016, 10, 5, 16, 4, 1), datetime.datetime(2014, 10, 31, 10, 14, 22), datetime.datetime(2023, 1, 19, 2, 35, 56), datetime.datetime(2019, 2, 23, 6, 35, 7), datetime.datetime(2020, 5, 5, 12, 30, 54), datetime.datetime(2022, 9, 17, 12, 27, 42), datetime.datetime(2020, 6, 26, 6, 58, 30), datetime.datetime(2015, 6, 15, 9, 21, 37), datetime.datetime(2021, 8, 2, 3, 13, 44), datetime.datetime(2019, 6, 29, 3, 24, 34), datetime.datetime(2014, 1, 25, 19, 11, 45), datetime.datetime(2020, 2, 6, 16, 46, 33)]
Top 10 most frequent values:


                                                                                

+-------------------+-----+
|timestamp_created  |count|
+-------------------+-----+
|2018-11-22 10:54:53|6    |
|2017-11-22 14:26:00|3    |
|2020-11-25 13:28:39|3    |
|2014-01-03 04:11:43|3    |
|2019-06-29 14:38:55|3    |
|2023-05-20 11:04:10|3    |
|2021-11-04 10:50:17|3    |
|2020-11-25 13:44:39|3    |
|2019-12-10 22:47:54|3    |
|2022-11-23 09:46:56|3    |
|2019-06-29 06:22:47|3    |
|2020-11-25 17:50:51|3    |
|2020-11-28 11:46:48|3    |
|2017-06-27 13:44:33|3    |
|2019-11-26 13:07:21|3    |
|2021-02-25 14:47:18|3    |
|2016-11-24 03:31:59|3    |
|2019-06-30 17:42:59|3    |
|2020-05-19 00:13:49|2    |
|2020-07-05 18:54:59|2    |
+-------------------+-----+



                                                                                

Values with leading/trailing whitespace: 0




Values with special characters: 973340


                                                                                