In [1]:
import pandas as pd
import numpy as np
import sys
import os

In [2]:
!{sys.executable} -m pip install pyspark

Defaulting to user installation because normal site-packages is not writeable
Collecting pyspark
  Downloading pyspark-3.5.5.tar.gz (317.2 MB)
     ---------------------------------------- 0.0/317.2 MB ? eta -:--:--
     --------------------------------------- 2.1/317.2 MB 10.7 MB/s eta 0:00:30
      -------------------------------------- 4.7/317.2 MB 11.4 MB/s eta 0:00:28
      -------------------------------------- 6.3/317.2 MB 11.7 MB/s eta 0:00:27
     - ------------------------------------- 8.9/317.2 MB 10.6 MB/s eta 0:00:29
     - ------------------------------------ 11.3/317.2 MB 10.8 MB/s eta 0:00:29
     - ------------------------------------ 13.9/317.2 MB 11.0 MB/s eta 0:00:28
     - ------------------------------------ 16.3/317.2 MB 11.2 MB/s eta 0:00:27
     -- ----------------------------------- 18.9/317.2 MB 11.3 MB/s eta 0:00:27
     -- ----------------------------------- 21.5/317.2 MB 11.4 MB/s eta 0:00:26
     -- ----------------------------------- 24.1/317.2 MB 11.5 M

In [4]:
import polars as pl

In [None]:
file_path_product = "meta_Kindle_store.parquet"
df_p = pl.read_parquet(file_path_product)
try:
    df_p = pl.read_parquet(file_path_product)
    print(df_p) #or df_p.head(), df_p.tail(), df_p.to_string()
except FileNotFoundError:
    print(f"Error: File not found at {file_path_product}")
except Exception as e:
    print(f"An error occurred: {e}")

In [6]:
df_p.head()

rating,title,text,images,asin,parent_asin,user_id,timestamp,helpful_vote,verified_purchase
f64,str,str,list[struct[4]],str,str,str,i64,i64,bool
5.0,"""excellent writer reminds me of…","""GRUMLEY is on par with Clive C…",[],"""B00LXRJICK""","""B00LXRJICK""","""AFKZENTNBQ7A7V7UXW5JJI6UGRYQ""",1427541413000,0,False
3.0,"""Alright book""","""The book Fade was not my favor…",[],"""B073DFP8VC""","""B073DFP8VC""","""AHGTHCERTEZUXNBLJ5SWHK2CDLXA""",1504226946142,0,True
5.0,"""Hats off to Fern Michaels for …","""I have been a fan of this auth…",[],"""B07QVH25KX""","""B07QVH25KX""","""AHFY2QSS6PK5MHSYZFI6TXUYNPLQ""",1644883955777,0,True
5.0,"""Great read""","""This book is more than just ab…",[],"""B004Y1NWQU""","""B004Y1NWQU""","""AHFY2QSS6PK5MHSYZFI6TXUYNPLQ""",1363027885000,0,False
5.0,"""Add to legend f Arthur""","""Good twist on the ledgen of Ki…",[],"""B08M993CNC""","""B08M993CNC""","""AFWHJ6O3PV4JC7PVOJH6CPULO2KQ""",1637557512064,0,True


In [5]:
file_path_customer = "Kindle_store.parquet"
try:
    df_c = pl.read_parquet(file_path_customer)
    df_c.head()
except FileNotFoundError:
    print(f"Error: File not found at {file_path_customer}")
except Exception as e:
    print(f"An error occurred: {e}")

In [29]:
#!/usr/bin/env python3
"""
Pandas script for preprocessing the customer.xlsx file.
It performs the following:
    - Reads the Excel file.
    - Drops the 'images' column if present.
    - Casts selected columns to appropriate types.
    - Computes a new "year" column by converting the original timestamp (in ms) to a year.
    - Scales the timestamp from milliseconds to days.
    - Computes counts per user_id and parent_asin.
    - Removes rows where either count is less than 3.
    - Prints removal statistics and the number of items per year.
    - Writes the cleaned DataFrame with extra count columns to a new Excel file.
"""

import pandas as pd

def preprocess_customer(input_path="customer.xlsx", output_path="customer_clean.xlsx"):
    df = pd.read_excel(input_path, engine="openpyxl")
    print(df.size)
    if "images" in df.columns:
        df = df.drop(columns=["images"])
    df["rating"] = df["rating"].astype(int)
    df["title"] = df["title"].astype(str)
    df["text"] = df["text"].astype(str)
    df["asin"] = df["asin"].astype(str)
    df["parent_asin"] = df["parent_asin"].astype(str)
    df["user_id"] = df["user_id"].astype(str)
    #df["timestamp"] = df["timestamp"].astype(int)
    df["helpful_vote"] = df["helpful_vote"].astype(int)
    df["verified_purchase"] = df["verified_purchase"].astype(bool)
    # Compute counts per user_id and parent_asin.
    df["count_user_id"] = df.groupby("user_id")["user_id"].transform("count")
    df["count_parent_asin"] = df.groupby("parent_asin")["parent_asin"].transform("count")
    
    total_items = df.shape[0]
    
    print("Total items before filtering:", total_items)
    
    df.to_excel(output_path, index=False, engine="openpyxl")
    return None

if __name__ == "__main__":
    preprocess_customer("customer.xlsx", "customer_clean.xlsx")


500000
Total items before filtering: 50000


In [15]:
import pandas as pd
# Import the cleaned Kindle Store dataset.
df_clean = pd.read_parquet("Kindle_Store_clean.parquet", engine="pyarrow")
print("Number of items per year:")
df_clean["year"] = pd.to_datetime(df_clean["timestamp"] * 86400, unit="s").dt.year
year_counts = df_clean.groupby("year").size().sort_index()
for yr, count in year_counts.items():
    print(f"Year {yr}: {count} items")

Number of items per year:
Year 1969: 10020939 items
Year 1970: 10014420 items


In [34]:
#!/usr/bin/env python3
"""
Pandas script for preprocessing the product.xlsx file.
It performs the following:
    - Reads the Excel file using openpyxl.
    - Drops unnecessary columns: main_category, images, videos, store, features, description.
    - Extracts the author’s name from the JSON in the author column.
    - Cleans the categories column by removing 'Kindle Store' and 'Kindle eBooks'.
    - From the details JSON, extracts:
         - 'Publication date': converts a date like "May 1, 2013" into Unix epoch days (float).
         - 'Print length': extracts the number from a string like "278 pages" and converts to int.
         - 'Language': extracts the language.
    - Casts the remaining columns to the proper types.
    - Writes the cleaned DataFrame to a new Excel file.
"""

import pandas as pd
from datetime import datetime
import ast

def extract_author_name(author):
    """
    Given a JSON-like author value (as a dict or string), extracts the 'name' field.
    Uses ast.literal_eval if the value is a string.
    """
    if isinstance(author, dict):
        return author.get("name", None)
    if isinstance(author, str):
        try:
            # Convert string representation of dict to a Python dict.
            author_dict = ast.literal_eval(author)
            return author_dict.get("name", None)
        except Exception:
            return author
    return author

def clean_categories(categories):
    """
    Removes 'Kindle Store' and 'Kindle eBooks' from the categories.
    If categories is a list, returns a list without the unwanted items.
    If it is a string, splits on commas, filters, and rejoins.
    """
    unwanted = {"Kindle Store", "Kindle eBooks"}
    if isinstance(categories, list):
        return [cat for cat in categories if cat not in unwanted]
    if isinstance(categories, str):
        cats = [cat.strip() for cat in categories.split(",")]
        filtered = [cat for cat in cats if cat not in unwanted]
        return ", ".join(filtered)
    return categories

def extract_details(details):
    """
    Given a JSON-like details value (as a dict or string), extracts:
      - Publication date: parses a date like "May 1, 2013" and converts it to Unix epoch days.
      - Print length: extracts numeric part from a string like "278 pages".
      - Language: extracts the language.
    Uses ast.literal_eval if needed.
    """
    pub_date = None
    print_length_str = None
    language = None
    
    if isinstance(details, dict):
        pub_date = details.get("Publication date", None)
        print_length_str = details.get("Print length", None)
        language = details.get("Language", None)
    elif isinstance(details, str):
        try:
            details_dict = ast.literal_eval(details)
            pub_date = details_dict.get("Publication date", None)
            print_length_str = details_dict.get("Print length", None)
            language = details_dict.get("Language", None)
        except Exception:
            pass

    # Convert publication date to Unix epoch days.
    pub_days = None
    if pub_date:
        try:
            # Try a known format e.g., "May 1, 2013"
            dt = datetime.strptime(pub_date, "%B %d, %Y")
            pub_days = dt.timestamp() / 86400
        except Exception:
            try:
                # Fall back to pandas conversion.
                dt = pd.to_datetime(pub_date)
                pub_days = dt.timestamp() / 86400
            except Exception:
                pub_days = None

    # Extract print length from a string like "278 pages".
    print_length = None
    if print_length_str:
        digits = ''.join(filter(str.isdigit, str(print_length_str)))
        if digits:
            print_length = int(digits)
    
    return pub_days, print_length, language

def preprocess_product(input_path="product.xlsx", output_path="product_clean.xlsx"):
    # Read the Excel file using openpyxl.
    df = pd.read_excel(input_path, engine="openpyxl")
    
    # Drop unnecessary columns.
    cols_to_drop = ["main_category", "images", "videos", "store", "features", "description"]
    df = df.drop(columns=cols_to_drop, errors="ignore")
    
    # Process the author column to extract the 'name' from the JSON.
    df["author"] = df["author"].apply(extract_author_name)
    
    # Clean the categories column.
    df["categories"] = df["categories"].apply(clean_categories)
    df["categories"] = df["categories"].apply(lambda x: ", ".join(x) if isinstance(x, list) else x)
    
    # Extract details from the details column.
    details_extracted = df["details"].apply(extract_details)
    df[["publication_date", "print_length", "language"]] = pd.DataFrame(details_extracted.tolist(), index=df.index)
    
    # Cast remaining columns.
    df["title"] = df["title"].astype(str)
    df["subtitle"] = df["subtitle"].astype(str)
    df["average_rating"] = df["average_rating"].astype(float)
    df["rating_number"] = df["rating_number"].astype(int)
    df["price"] = df["price"].astype(float)
    df["parent_asin"] = df["parent_asin"].astype(str)
    
    # Select final columns in the desired order.
    final_columns = [
        "title", "subtitle", "author", "average_rating", "rating_number", "price",
        "categories", "publication_date", "parent_asin", "language", "print_length"
    ]
    df_clean = df[final_columns]
    
    df_clean.to_excel(output_path, index=False, engine="openpyxl")
    return df_clean

if __name__ == "__main__":
    preprocess_product("product.xlsx", "product_clean.xlsx")


In [25]:
MAX_ROWS_TO_SAVE = 50000

def save_first_n_rows_to_excel(df: pl.DataFrame, output_file: str, n: int = MAX_ROWS_TO_SAVE):
    """
    Retrieves the first 'n' rows of a Polars DataFrame and saves them to an Excel file.

    Args:
        df: The Polars DataFrame.
        output_file: The path to the Excel file to save.
        n: The number of rows to select (default: MAX_ROWS_TO_SAVE).
    """
    try:
        selected_rows = df.head(n) if df.height > n else df
        pandas_df = selected_rows.to_pandas()
        pandas_df.to_excel(output_file, index=False)
        print(f"First {min(n, df.height)} rows saved to {output_file}")
    except Exception as e:
        print(f"An error occurred: {e}")
try:
    file_path_product = "meta_Kindle_store.parquet"
    df_p = pl.read_parquet(file_path_product)
    file_path_customer = "Kindle_store.parquet"
    df_c = pl.read_parquet(file_path_customer)
    save_first_n_rows_to_excel(df_p, "product.xlsx")
    save_first_n_rows_to_excel(df_c, "customer.xlsx")
except FileNotFoundError:
    print("Error: File not found at the specified path")
except Exception as e:
    print(f"An error occurred: {e}")


First 50000 rows saved to product.xlsx
First 50000 rows saved to customer.xlsx


In [36]:
from pyspark.sql import SparkSession
from pyspark.sql import Row

# Step 1: Create a Spark session
spark = SparkSession.builder \
    .appName("SimpleSparkApp") \
    .getOrCreate()

# Step 2: Create a simple DataFrame
# Let's create some sample data
data = [(1, "John Doe", 29), (2, "Jane Smith", 35), (3, "Sam Brown", 22)]
columns = ["id", "name", "age"]

# Convert the list of Row objects to a DataFrame
df = spark.createDataFrame(data,schema=columns)

# Show the DataFrame
df.show()

# write the data into a csv file
df.write.csv("output_data.csv", header=True)

# stop the spark application
spark.stop()

ConnectionRefusedError: [WinError 10061] No connection could be made because the target machine actively refused it

In [None]:
import polars as pl
import os

def split_polars_dataframe(df: pl.DataFrame, num_files: int, output_dir: str, base_filename: str):
    """
    Splits a Polars DataFrame into 'num_files' equal-sized files.

    Args:
        df: The Polars DataFrame to split.
        num_files: The number of files to create.
        output_dir: The directory to save the split files.
        base_filename: The base filename for the split files.
    """
    try:
        os.makedirs(output_dir, exist_ok=True)  # Create the output directory if it doesn't exist

        chunk_size = len(df) // num_files
        remainder = len(df) % num_files

        start_index = 0
        for i in range(num_files):
            end_index = start_index + chunk_size
            if i < remainder:  # Distribute the remainder rows
                end_index += 1

            chunk = df[start_index:end_index]
            output_file = os.path.join(output_dir, f"{base_filename}_{i + 1}.parquet")
            chunk.write_parquet(output_file)
            print(f"Saved chunk {i + 1} to {output_file}")

            start_index = end_index

    except Exception as e:
        print(f"An error occurred: {e}")

# Example usage:
file_path = "polars_data.parquet" #replace with your file
output_directory = "split_files"
base_filename = "data_chunk"
num_files_to_split = 5

try:
    df = pl.read_parquet(file_path)
    split_polars_dataframe(df, num_files_to_split, output_directory, base_filename)

except FileNotFoundError:
    print(f"Error: File not found at {file_path}")
except Exception as e:
    print(f"An error occurred: {e}")