# 1.0 Importing Libraries

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pandas as pd
import numpy as np
import hashlib
import warnings
import os
import shutil
import time

pd.set_option("display.max_columns",None)
pd.set_option("display.max_rows",None)

warnings.filterwarnings("ignore")

In [9]:
def load_col(data_path, chunksize):
    for df_iter, chunk in  enumerate(pd.read_csv(data_path, chunksize=chunksize, iterator=False)):
        pass
    col_list = list(chunk.columns)
    return col_list, chunk

col_list, data = load_col(r"..\Data\CarSalePrice.csv", chunksize=100)

In [12]:
# LOADING PARAMETER
chunksize= 1e5
df_dir = r"..\Data\external\used_cars_data.csv"
# loading chunk of the data for the column list
def col_list(data_path, chunksize, progress=None):
    print("Checking and loading file")
    time.sleep(0.9)
    for df_iter, chunk in enumerate(pd.read_csv(data_path, chunksize=chunksize, iterator=False)):
        pass
    col_list = list(chunk.columns)
    return col_list

# 1.0 Partitioning

## 1.1 Partitioning Parameter

In [4]:
N_PARTITION = 50    # Number of buckets
base_partitions_dir = "../data/external/Partition"
output_dir = "../data/external/output"

## 1.2 Function for Hashing id

In [5]:
# hashing the listing id to allow even partitioning across the dataset
def hash_(listing_id):
    """Creates an hashed column using the listing id for the vehicle"""
    return int(hashlib.md5(str(listing_id).encode("utf-8")).hexdigest(), 16)

## 1.3 Creating dir for partition

In [13]:
def create_partition():
    """Creates an empty partition directory for the buckets"""
    start = time.time()
    print("Checking if the directory exists...")
    time.sleep(0.9)
    if os.path.exists(base_partitions_dir):
        print("Directory found")
        time.sleep(0.4)
        print("Removing directory")
        time.sleep(1)
        shutil.rmtree(base_partitions_dir)
        print("Removed the directory")
    else:
        print("No Such Directory found.")

    # Delaying before creating the directories
    time.sleep(2.5)

    print("Creating empty folder list for partition")
    time.sleep(0.9)
    if not os.path.exists(base_partitions_dir):
        # Creating partition directory
        os.mkdir(base_partitions_dir)
        # Making a new directory for the partitions
        for i in range(N_PARTITION):
            partition_path = os.path.join(
                base_partitions_dir, "p{}".format(i)).replace("\\", "/")
            # Printing the path
            print('| {} | Partition left {} |'.format(partition_path,N_PARTITION-i))
            if not os.path.exists(partition_path):
                os.mkdir(partition_path)
            else:
                print("Path Already exist")
            time.sleep(0.6)
    end = time.time()
    print("| Completed | Time Taken ------------------------- {}sec |".format(str(end-start)))
# Making the directory
dir = create_partition()


Checking if the directory exists...
Directory found
Removing directory
Removed the directory
Creating empty folder list for partition
| ../data/external/Partition/p0 | Partition left 50 |
| ../data/external/Partition/p1 | Partition left 49 |
| ../data/external/Partition/p2 | Partition left 48 |
| ../data/external/Partition/p3 | Partition left 47 |
| ../data/external/Partition/p4 | Partition left 46 |
| ../data/external/Partition/p5 | Partition left 45 |
| ../data/external/Partition/p6 | Partition left 44 |
| ../data/external/Partition/p7 | Partition left 43 |
| ../data/external/Partition/p8 | Partition left 42 |
| ../data/external/Partition/p9 | Partition left 41 |
| ../data/external/Partition/p10 | Partition left 40 |
| ../data/external/Partition/p11 | Partition left 39 |
| ../data/external/Partition/p12 | Partition left 38 |
| ../data/external/Partition/p13 | Partition left 37 |
| ../data/external/Partition/p14 | Partition left 36 |
| ../data/external/Partition/p15 | Partition left 3

## 1.4 Creating blank partition

In [28]:
def create_blank_partition():
    """Creating a blank partition with the number of bucket"""
    start = time.time()
    data_list = col_list(df_dir, chunksize)
    for i in range(N_PARTITION):
        file_base_dir = os.path.join(base_partitions_dir,"p{}".format(str(i)),"").replace("\\","/")
        print(file_base_dir)
        # Opening the file and writing it to the partition created
        with open(file_base_dir+"vehicle_used_data.csv", "w") as f:
            f.write(",".join(data_list))
    end = time.time()
    print("Time taken ------------------- | {}sec".format(str(end-start)))
    return file_base_dir
        
dir_path = create_blank_partition()

../data/external/Partition/p0/
../data/external/Partition/p1/
../data/external/Partition/p2/


## Partitioning by hashing

In [None]:
# Partitioing and hashing the 
def partition_by_hashing(df, name , progress= None):
    # hashing the listing_id column into the number of partitions
    df["hashed"] = df["listing_id"].apply(hash_) % N_PARTITION
    for partitions, data in df.groupby("hashed"):
        # Wrting the data to the partition
        path_dir = os.path.join(base_partitions_dir,"Vehicle_used_data_{}.csv".format(str(partitions)))
        # Writing the data to the path
        with open(path_dir, "a") as f:
            f.write("\n")
            data.to_csv(f, header=False, index=False)


dir = create_blank_partition()
os.listdir(dir)

# Reading file in chunk

In [33]:
chunksize = 1e5
temp = pd.read_csv(r"..\Data\used_cars_data.csv", iterator=True, chunksize=chunksize)
df = pd.concat(temp, ignore_index=True)

# data = partition_by_hashing(df, name="listing_id", progress=None)


  return func(*args, **kwargs)


In [5]:


# Partitioning the data
# The dataset contain more than 3million records, 
# we will partition these records into partition of 50 buckets 
# making about 60,000 records per bucket


# PARTITIONING PARAMETER

N_PARTITION = 50    # Number of buckets
base_partitions_dir = r"..\data\external\Partitioned"
output_dir = "../data/external/output"

# hashing the listing id to allow even partitioning across the dataset
def hash_(listing_id):
    """Creates an hashed column using the listing id for the vehicle"""
    return int(hashlib.md5(str(listing_id).encode("utf-8")).hexdigest(), 16)

def create_partition():
    """Creates an empty partition directory for the buckets"""
    print("Checking if the directory exists")
    if os.path.exists(base_partitions_dir):
        shutil.rmtree(base_partitions_dir)
        print("removed the directory")
    else:
        print("No Such Directory found.")
        
    # Delaying before creating the directories
    time.sleep(0.5) 
    
    print("Creating empty folder list for partition")
    if not os.path.exists(os.path.join(base_partitions_dir,"root")):
        # Making a new directory for the partitions
        for i in range(N_PARTITION):
            partition_path = os.path.join(base_partitions_dir, "root", i)
            print(partition_path)
            if not os.path.exists(partition_path):
                os.mkdir(partition_path)



def create_blank_partition():
    
    for i in range(N_PARTITION):
        
        dir = os.path.join(base_partitions_dir, "root", i)
        file_path = r"..\data\external\used_cars_data.csv"
        
        with open(file_path, "r") as data:
            with open(dir, "w") as f:
                f.write(",".join(list(data.columns)))
     
        return dir

def partition_by_hashing(df, name , progress= None):
    # hashing the listing_id column into the number of partitions
    df["partition"] = df["listing_id"].apply(hash_) % N_PARTITION
    for partitions, data in df.groupby("partition"):
        # Wrting the data to the partition
        path_dir =os.path.join(base_partitions_dir,"root", partitions)
        with open(path_dir, "w") as f:
            f.write(path_dir, data)
        
dir = create_blank_partition()
os.listdir(dir)


# Making a sparksession
SPARK_SESSION = SparkSession \
    .builder \
    .appName("Preprocessing with Spark") \
    .getOrCreate()

# Reading the data
df = SPARK_SESSION.read.csv(
    r"..\data\external\used_cars_data.csv", header=True, inferSchema=True )

#using the main file for the above and that is the only thing for now



df.printSchema()
# Using the current apache spark

root
 |-- vin: string (nullable = true)
 |-- back_legroom: string (nullable = true)
 |-- bed: string (nullable = true)
 |-- bed_height: string (nullable = true)
 |-- bed_length: string (nullable = true)
 |-- body_type: string (nullable = true)
 |-- cabin: string (nullable = true)
 |-- city: string (nullable = true)
 |-- city_fuel_economy: string (nullable = true)
 |-- combine_fuel_economy: string (nullable = true)
 |-- daysonmarket: string (nullable = true)
 |-- dealer_zip: string (nullable = true)
 |-- description: string (nullable = true)
 |-- engine_cylinders: string (nullable = true)
 |-- engine_displacement: string (nullable = true)
 |-- engine_type: string (nullable = true)
 |-- exterior_color: string (nullable = true)
 |-- fleet: string (nullable = true)
 |-- frame_damaged: string (nullable = true)
 |-- franchise_dealer: string (nullable = true)
 |-- franchise_make: string (nullable = true)
 |-- front_legroom: string (nullable = true)
 |-- fuel_tank_volume: string (nullable = tr

In [22]:
# Making list of the variable to use for the dataframe
cols = ["region","price","year","manufacturer","model",
            "condition","cylinders","fuel","odometer","transmission",
                "drive","size","type","state","lat","long","posting_date"]
# Reading the file
data_f = pd.read_csv(
    r"..\data\external\vehicles.csv", sep=",", usecols=cols)

# Making a copy of the data
data = data_f.copy()
# converting the year posted to pandas datetime format
data['posting_date'] = pd.to_datetime(data['posting_date'])



In [24]:
data.isnull().sum()

region               0
price                0
year              1205
manufacturer     17646
model             5277
condition       174104
cylinders       177678
fuel              3013
odometer          4400
transmission      2556
drive           130567
size            306361
type             92858
state                0
lat               6549
long              6549
posting_date        68
dtype: int64

In [28]:
def del_var(dataset):
    """[summary]

    Args:
        dataset ([type]): [description]

    Returns:
        [type]: [description]
    """
    for feature in dataset.columns:
        if dataset[feature].isnull().mean() > 0.5:
            dataset = dataset.drop(feature, axis=1)
            print("dropped {}".format(feature))
            
    return dataset

data_ = del_var(data)


dropped size


In [26]:
data_.head()

Unnamed: 0,region,price,year,manufacturer,model,condition,cylinders,fuel,odometer,transmission,drive,type,state,lat,long,posting_date
0,prescott,6000,,,,,,,,,,,az,,,
1,fayetteville,11900,,,,,,,,,,,ar,,,
2,florida keys,21000,,,,,,,,,,,fl,,,
3,worcester / central MA,1500,,,,,,,,,,,ma,,,
4,greensboro,4900,,,,,,,,,,,nc,,,


In [13]:
df.select("type").distinct().collect()

[Row(type='van'),
 Row(type='mini-van'),
 Row(type='offroad'),
 Row(type='wagon'),
 Row(type=None),
 Row(type='coupe'),
 Row(type='bus'),
 Row(type='SUV'),
 Row(type='other'),
 Row(type='convertible'),
 Row(type='-121.7473'),
 Row(type='sedan'),
 Row(type='hatchback'),
 Row(type='truck'),
 Row(type='pickup'),
 Row(type=' used cars'),
 Row(type=' 645'),
 Row(type=' accuracy'),
 Row(type=' GMC '),
 Row(type=' Orlando Car Deals'),
 Row(type=' dually'),
 Row(type=' S550')]

In [None]:
# For the normal operation