In [None]:
pip install pyspark

In [None]:
import os
from pyspark.sql import SparkSession

# Create a Spark session
os.environ["HADOOP_HOME"] = "C:\\hadoop-3.3.5"

# Set hadoop.home.dir system property
os.environ["hadoop.home.dir"] = "C:\\hadoop-3.3.5"

# Initialize SparkSession
spark = SparkSession.builder.appName("ebay_analysis").getOrCreate()

# File location and type
file_location = "Data\\Data with coordinates\\Final_Dataset.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "false"
delimiter = ","

# Read data from CSV file into a DataFrame
final_dataset = spark.read.format(file_type) \
    .option("inferSchema", infer_schema) \
    .option("header", first_row_is_header) \
    .option("sep", delimiter) \
    .load(file_location)

# Show the DataFrame
final_dataset.show()

In [None]:
final_dataset = final_dataset.dropna()
final_dataset.show()

In [None]:
from pyspark.sql.functions import monotonically_increasing_id

column_names = ["Location","Latitude", "Longitude", "Gender", "Title", "Price", "Total Sold Items", "Total Available Items","Rating", "Seller Name", "Seller Feedback", "Product Condition", "URL", "Category"]

# Assuming data is your DataFrame with the default column names
final_dataset = final_dataset.toDF(*column_names)

final_dataset_with_id = final_dataset.withColumn("row_id", monotonically_increasing_id())

# Identify the row to be deleted (in this case, the first row)
final_dataset = final_dataset_with_id.filter("row_id != 0")

# Drop the identifier column if not needed
final_dataset = final_dataset.drop("row_id")


# Show the result or proceed with further analysis
final_dataset.show()

In [None]:
final_dataset = final_dataset.withColumn("PID", monotonically_increasing_id())
# women_clothing_df = women_clothing_df.select("PID", *women_clothing_df.columns)
# Show the DataFrame with the new "PID" column
final_dataset = final_dataset.select('PID', 'Category', *[col for col in final_dataset.columns if col not in ['PID', 'Category']])


final_dataset.show()

In [None]:
from pyspark.sql.functions import regexp_replace, col, regexp_extract, when, rand, round

final_dataset_df = final_dataset.withColumn("Price", regexp_replace(col("Price"), "[^0-9.]", ""))

# Convert the "Price" column to a numeric format
final_dataset_df = final_dataset_df.withColumn("Price", col("Price").cast("float"))

final_dataset_df = final_dataset_df.withColumn("Total Sold Items",
                                                           regexp_extract(col("Total Sold Items"), r'(\d+)', 1).cast("integer"))

final_dataset_df = final_dataset_df.withColumn("Rating",
                                                           when(col("Rating") == "Not Available",
                                                                (round(rand() * 9 + 1) / 2).cast("float"))
                                                           .otherwise(col("Rating").cast("float")))

final_dataset_df = final_dataset_df.withColumn("Total Available Items",
                                                           regexp_extract(col("Total Available Items"), r'(\d+)', 1).cast("integer"))



In [None]:
final_dataset_df.show()

In [None]:
final_dataset_df = final_dataset_df.filter(final_dataset_df['Location'] != 'Not Available')

final_dataset_df.show()


In [None]:
from pyspark.sql.functions import col, rand
from pyspark.sql.window import Window

# Assuming 'final_dataset_df' is your DataFrame
# Generate random values between 1 and 50
min_value = 1
max_value = 50

# Replace null values with random values
final_dataset_df = final_dataset_df.withColumn(
    'Total Sold Items',
    when(col('Total Sold Items').isNull(), (rand() * (max_value - min_value) + min_value).cast('int'))
    .otherwise(col('Total Sold Items'))
)

final_dataset_df = final_dataset_df.withColumn(
    'Total Available Items',
    when(col('Total Available Items').isNull(), (rand() * (max_value - min_value) + min_value).cast('int'))
    .otherwise(col('Total Available Items'))
)

final_dataset_df.show()

In [None]:
final_dataset_df.coalesce(1).write.csv("E:\\Ebay Data Analysis\\Ebay_Analysis\\Output", header=True, mode="overwrite")


In [None]:
final_dataset_df.show()

In [None]:
## Top categories according to location - latitude - longitude

from pyspark.sql.window import Window
from pyspark.sql.functions import rank


# Define a window specification for ranking categories within each location
window_spec = Window.partitionBy('Location').orderBy(desc('Total Sold Items'))

# Rank the categories based on total sold items within each location
ranked_df = final_dataset_df.withColumn('rank', rank().over(window_spec))

# Select the top-selling categories for each location
top_categories_df = ranked_df.filter(col('rank') == 1).select('Category','Seller Name','Title','Price', 'Location', 'Latitude', 'Longitude', 'Total Sold Items')


top_categories_df = top_categories_df.filter(~col('Category').contains('http'))
top_categories_df = top_categories_df.limit(50)

top_categories_df.coalesce(1).write.csv("E:\\Ebay Data Analysis\\Ebay_Analysis\\Output", header=True, mode="overwrite")
top_categories_df.show()



In [None]:
# Top sellers category wise
from pyspark.sql.functions import desc

# Group by Category and Seller Name, then calculate the total number of items sold
top_sellers_category_wise_df = (
    final_dataset_df
    .groupBy('Category', 'Seller Name')
    .agg({'Total Sold Items': 'sum'})
    .withColumnRenamed('sum(Total Sold Items)', 'Total Items Sold')
    .orderBy('Category', desc('Total Items Sold'))
)

# Window function to assign row numbers based on total items sold within each category
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

windowSpec = Window.partitionBy('Category').orderBy(desc('Total Items Sold'))

# Add a row number column to get the top seller within each category
top_sellers_category_wise_df = (
    top_sellers_category_wise_df
    .withColumn('row_num', row_number().over(windowSpec))
    .filter(col('row_num') == 1)
    .drop('row_num')
)

# Show the result
top_sellers_category_wise_df.show(truncate=False)


In [None]:
result_df.show(20, truncate=False)

In [None]:
final_dataset_df.select('Category').distinct().show(truncate=False)
final_dataset_df.filter(col('Category').isNotNull() & (col('Category') != "")).show()
