In [1]:
!wget https://archive.ics.uci.edu/static/public/352/online+retail.zip

--2024-11-19 08:07:19--  https://archive.ics.uci.edu/static/public/352/online+retail.zip
Resolving archive.ics.uci.edu (archive.ics.uci.edu)... 128.195.10.252
Connecting to archive.ics.uci.edu (archive.ics.uci.edu)|128.195.10.252|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: unspecified
Saving to: ‘online+retail.zip’

online+retail.zip       [  <=>               ]  22.62M  88.0MB/s    in 0.3s    

2024-11-19 08:07:19 (88.0 MB/s) - ‘online+retail.zip’ saved [23715478]



In [2]:
!unzip online+retail.zip

Archive:  online+retail.zip
 extracting: Online Retail.xlsx      


In [3]:
import pandas as pd
data = pd.read_excel("Online Retail.xlsx")
data.to_csv("Online Retail.csv", index=False)

In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr, sum, avg, count, max, min, year, month, dayofmonth, desc
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, TimestampType
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans

# Data Visualization Libraries
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import numpy as np

class RetailDataAnalysis:
    def __init__(self, file_path):
        """
        Initialize Spark session and load retail data

        Args:
            file_path (str): Path to the UCI Online Retail CSV file
        """
        self.spark = SparkSession.builder \
            .appName("UCI Online Retail Comprehensive Analysis") \
            .config("spark.sql.pandas.outputConvertMode", "arrow") \
            .getOrCreate()

        # Set visualization style
        plt.style.use("seaborn-v0_8")
        sns.set_palette("deep")

        # Load and preprocess data
        self.df = self.load_retail_data(file_path)

    def load_retail_data(self, file_path):
        """
        Load and clean UCI Online Retail dataset

        Returns:
            DataFrame: Processed retail dataset
        """
        schema = StructType([
            StructField("InvoiceNo", StringType(), True),
            StructField("StockCode", StringType(), True),
            StructField("Description", StringType(), True),
            StructField("Quantity", IntegerType(), True),
            StructField("InvoiceDate", TimestampType(), True),
            StructField("UnitPrice", DoubleType(), True),
            StructField("CustomerID", StringType(), True),
            StructField("Country", StringType(), True)
        ])

        df = self.spark.read \
            .format("csv") \
            .option("header", "true") \
            .schema(schema) \
            .load(file_path)

        # Data cleaning and preprocessing
        df = df.filter(col("Quantity") > 0) \
               .filter(col("UnitPrice") > 0) \
               .na.drop(subset=["CustomerID"])

        df = df.withColumn("TotalSales", col("Quantity") * col("UnitPrice"))

        return df

    def country_sales_visualization(self):
        """
        Visualize sales distribution across countries
        """
        country_sales = self.df.groupBy("Country") \
            .agg(
                sum("TotalSales").alias("TotalCountrySales"),
                count("InvoiceNo").alias("TransactionCount")
            ) \
            .orderBy(col("TotalCountrySales").desc()) \
            .toPandas()

        plt.figure(figsize=(15, 6))
        plt.subplot(121)
        sns.barplot(x='Country', y='TotalCountrySales', data=country_sales.head(10))
        plt.title('Top 10 Countries by Total Sales')
        plt.xticks(rotation=45, ha='right')
        plt.tight_layout()

        plt.subplot(122)
        sns.barplot(x='Country', y='TransactionCount', data=country_sales.head(10))
        plt.title('Top 10 Countries by Transaction Volume')
        plt.xticks(rotation=45, ha='right')
        plt.tight_layout()

        plt.savefig('country_sales_analysis.png', dpi=300, bbox_inches='tight')
        plt.close()

    def time_series_analysis(self):
        """
        Perform and visualize time series analysis
        """
        from pyspark.sql.functions import to_date

        # Monthly sales trend
        monthly_sales = self.df.groupBy(
            year("InvoiceDate").alias("Year"),
            month("InvoiceDate").alias("Month")
        ).agg(
            sum("TotalSales").alias("MonthlySales")
        ).orderBy("Year", "Month").toPandas()

        plt.figure(figsize=(15, 6))
        plt.subplot(121)
        sns.lineplot(x='Month', y='MonthlySales', hue='Year', data=monthly_sales)
        plt.title('Monthly Sales Trend')
        plt.xlabel('Month')
        plt.ylabel('Total Sales')

        plt.subplot(122)
        # Daily sales trend
        daily_sales = self.df.groupBy(to_date(col("InvoiceDate")).alias("Date")) \
            .agg(sum("TotalSales").alias("DailySales")) \
            .orderBy("Date").toPandas()

        sns.lineplot(x='Date', y='DailySales', data=daily_sales)
        plt.title('Daily Sales Trend')
        plt.xticks(rotation=45)
        plt.tight_layout()

        plt.savefig('time_series_analysis.png', dpi=300, bbox_inches='tight')
        plt.close()

    def product_analysis(self):
        """
        Analyze and visualize product sales
        """
        product_sales = self.df.groupBy("StockCode", "Description") \
            .agg(
                sum("Quantity").alias("TotalQuantity"),
                sum("TotalSales").alias("TotalProductSales")
            ) \
            .orderBy(col("TotalProductSales").desc()) \
            .limit(10).toPandas()

        plt.figure(figsize=(15, 6))
        plt.subplot(121)
        sns.barplot(x='Description', y='TotalQuantity', data=product_sales)
        plt.title('Top 10 Products by Quantity Sold')
        plt.xticks(rotation=90)

        plt.subplot(122)
        sns.barplot(x='Description', y='TotalProductSales', data=product_sales)
        plt.title('Top 10 Products by Total Sales')
        plt.xticks(rotation=90)
        plt.tight_layout()

        plt.savefig('product_analysis.png', dpi=300, bbox_inches='tight')
        plt.close()

    def customer_segmentation(self):
        """
        Perform customer segmentation with visualization
        """
        # Prepare customer features for clustering
        customer_features = self.df.groupBy("CustomerID") \
            .agg(
                sum("TotalSales").alias("TotalSpent"),
                count("InvoiceNo").alias("TransactionCount")
            ).toPandas()

        # Normalize features
        from sklearn.preprocessing import StandardScaler
        scaler = StandardScaler()
        features = scaler.fit_transform(customer_features[['TotalSpent', 'TransactionCount']])

        # K-means clustering
        from sklearn.cluster import KMeans
        kmeans = KMeans(n_clusters=3, random_state=42)
        customer_features['Segment'] = kmeans.fit_predict(features)

        plt.figure(figsize=(12, 6))
        sns.scatterplot(
            x='TotalSpent',
            y='TransactionCount',
            hue='Segment',
            data=customer_features,
            palette='viridis'
        )
        plt.title('Customer Segmentation')
        plt.xlabel('Total Spending')
        plt.ylabel('Number of Transactions')
        plt.savefig('customer_segmentation.png', dpi=300, bbox_inches='tight')
        plt.close()

    def generate_comprehensive_report(self):
        """
        Generate a comprehensive analysis with visualizations
        """
        self.country_sales_visualization()
        self.time_series_analysis()
        self.product_analysis()
        self.customer_segmentation()

        print("Comprehensive analysis complete. Generated visualizations:")
        print("1. country_sales_analysis.png")
        print("2. time_series_analysis.png")
        print("3. product_analysis.png")
        print("4. customer_segmentation.png")

def main():
    # Replace with your actual file path
    file_path = "Online Retail.csv"

    # Create analysis instance
    analysis = RetailDataAnalysis(file_path)

    # Generate comprehensive report
    analysis.generate_comprehensive_report()

if __name__ == "__main__":
    main()

Comprehensive analysis complete. Generated visualizations:
1. country_sales_analysis.png
2. time_series_analysis.png
3. product_analysis.png
4. customer_segmentation.png


In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr, sum, avg, count, max, min, year, month, dayofmonth, desc, udf
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, TimestampType, ArrayType
from pyspark.ml.feature import VectorAssembler, StandardScaler, PCA
from pyspark.ml.clustering import KMeans, GaussianMixture
from pyspark.ml.regression import LinearRegression, RandomForestRegressor
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import RegressionEvaluator, ClusteringEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.linalg import Vectors

# Data Visualization Libraries
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import numpy as np
import plotly.express as px
import plotly.graph_objects as go
from scipy.stats import zscore

class AdvancedRetailDataAnalysis:
    def __init__(self, file_path):
        """
        Initialize Spark session with advanced configurations

        Args:
            file_path (str): Path to the UCI Online Retail CSV file
        """
        self.spark = SparkSession.builder \
            .appName("Advanced UCI Online Retail Analysis") \
            .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
            .config("spark.sql.adaptive.enabled", "true") \
            .config("spark.dynamicAllocation.enabled", "true") \
            .getOrCreate()

        # Advanced visualization setup
        plt.style.use("seaborn-v0_8-dark")

        # Load and preprocess data
        self.df = self.load_and_preprocess_data(file_path)

    def load_and_preprocess_data(self, file_path):
        """
        Advanced data loading and preprocessing
        """
        schema = StructType([
            StructField("InvoiceNo", StringType(), True),
            StructField("StockCode", StringType(), True),
            StructField("Description", StringType(), True),
            StructField("Quantity", IntegerType(), True),
            StructField("InvoiceDate", TimestampType(), True),
            StructField("UnitPrice", DoubleType(), True),
            StructField("CustomerID", StringType(), True),
            StructField("Country", StringType(), True)
        ])

        df = self.spark.read \
            .format("csv") \
            .option("header", "true") \
            .schema(schema) \
            .load(file_path)

        # Advanced cleaning and feature engineering
        df = df.filter(col("Quantity") > 0) \
               .filter(col("UnitPrice") > 0) \
               .na.drop(subset=["CustomerID"])

        df = df.withColumn("TotalSales", col("Quantity") * col("UnitPrice")) \
               .withColumn("Year", year("InvoiceDate")) \
               .withColumn("Month", month("InvoiceDate")) \
               .withColumn("DayOfWeek", dayofmonth("InvoiceDate"))

        return df

    def advanced_customer_segmentation(self):
        """
        Advanced customer segmentation using multiple techniques
        """
        # Prepare customer features
        customer_features_df = self.df.groupBy("CustomerID") \
            .agg(
                sum("TotalSales").alias("TotalSpent"),
                count("InvoiceNo").alias("TransactionCount"),
                avg("UnitPrice").alias("AvgUnitPrice"),
                max("TotalSales").alias("MaxSinglePurchase")
            )

        # Convert to Pandas for advanced preprocessing
        customer_features_pd = customer_features_df.toPandas()

        # Normalize features using z-score
        features_scaled = customer_features_pd[['TotalSpent', 'TransactionCount', 'AvgUnitPrice', 'MaxSinglePurchase']].apply(zscore)

        # Multiple clustering techniques
        # 1. K-means Clustering
        from sklearn.cluster import KMeans
        kmeans = KMeans(n_clusters=4, random_state=42)
        kmeans_labels = kmeans.fit_predict(features_scaled)

        # 2. Gaussian Mixture Model
        from sklearn.mixture import GaussianMixture
        gmm = GaussianMixture(n_components=4, random_state=42)
        gmm_labels = gmm.fit_predict(features_scaled)

        # Add cluster labels to DataFrame
        customer_features_pd['KMeans_Cluster'] = kmeans_labels
        customer_features_pd['GMM_Cluster'] = gmm_labels

        # Interactive 3D scatter plot with Plotly
        fig = px.scatter_3d(
            customer_features_pd,
            x='TotalSpent',
            y='TransactionCount',
            z='AvgUnitPrice',
            color='KMeans_Cluster',
            title='Advanced Customer Segmentation',
            labels={'KMeans_Cluster': 'Customer Segment'}
        )
        fig.write_html('advanced_customer_segmentation.html')

    def predictive_product_analysis(self):
        """
        Advanced predictive analysis for product sales
        """
        # Prepare product features for prediction
        product_features = self.df.groupBy("StockCode", "Description") \
            .agg(
                sum("Quantity").alias("TotalQuantity"),
                sum("TotalSales").alias("TotalSales"),
                avg("UnitPrice").alias("AvgPrice")
            )

        # Convert to Vector Assembler format
        assembler = VectorAssembler(inputCols=["TotalQuantity", "AvgPrice"], outputCol="features")
        product_data = assembler.transform(product_features)

        # Split data
        (train_data, test_data) = product_data.randomSplit([0.7, 0.3], seed=42)

        # Multiple regression techniques
        # 1. Linear Regression
        lr = LinearRegression(featuresCol="features", labelCol="TotalSales")
        lr_model = lr.fit(train_data)

        # 2. Random Forest Regression
        rf = RandomForestRegressor(featuresCol="features", labelCol="TotalSales")
        rf_model = rf.fit(train_data)

        # Evaluate models
        lr_predictions = lr_model.transform(test_data)
        rf_predictions = rf_model.transform(test_data)

        evaluator = RegressionEvaluator(labelCol="TotalSales", predictionCol="prediction")

        print("Linear Regression R² Score:", evaluator.evaluate(lr_predictions))
        print("Random Forest R² Score:", evaluator.evaluate(rf_predictions))

        # Visualize feature importance for Random Forest
        feature_importances = pd.DataFrame({
            'feature': ['TotalQuantity', 'AvgPrice'], # Use feature names from VectorAssembler
            'importance': rf_model.featureImportances.toArray() # Convert SparseVector to NumPy array
        })

        plt.figure(figsize=(10, 6))
        sns.barplot(x='feature', y='importance', data=feature_importances)
        plt.title('Feature Importance in Product Sales Prediction')
        plt.tight_layout()
        plt.savefig('product_sales_feature_importance.png')
        plt.close()

    def temporal_pattern_discovery(self):
        """
        Advanced temporal pattern analysis using machine learning
        """
        # Time-based features
        time_features = self.df.groupBy("Year", "Month") \
            .agg(
                sum("TotalSales").alias("MonthlySales"),
                count("InvoiceNo").alias("TransactionCount")
            ) \
            .orderBy("Year", "Month")

        # Convert to Pandas for advanced time series analysis
        time_df = time_features.toPandas()

        # Seasonal decomposition
        from statsmodels.tsa.seasonal import seasonal_decompose

        # Assuming we want to decompose monthly sales
        decomposition = seasonal_decompose(time_df['MonthlySales'], period=2)

        # Create subplots for decomposition
        plt.figure(figsize=(15, 10))
        plt.subplot(411)
        plt.plot(decomposition.observed)
        plt.title('Observed')
        plt.subplot(412)
        plt.plot(decomposition.trend)
        plt.title('Trend')
        plt.subplot(413)
        plt.plot(decomposition.seasonal)
        plt.title('Seasonal')
        plt.subplot(414)
        plt.plot(decomposition.resid)
        plt.title('Residual')
        plt.tight_layout()
        plt.savefig('sales_decomposition.png')
        plt.close()

    def generate_advanced_report(self):
        """
        Generate comprehensive advanced analysis
        """
        self.advanced_customer_segmentation()
        self.predictive_product_analysis()
        self.temporal_pattern_discovery()

        print("Advanced analysis complete with:")
        print("1. Interactive 3D Customer Segmentation (advanced_customer_segmentation.html)")
        print("2. Product Sales Feature Importance (product_sales_feature_importance.png)")
        print("3. Sales Temporal Decomposition (sales_decomposition.png)")

def main():
    # Replace with your actual file path
    file_path = "Online Retail.csv"

    # Create advanced analysis instance
    analysis = AdvancedRetailDataAnalysis(file_path)

    # Generate advanced report
    analysis.generate_advanced_report()

if __name__ == "__main__":
    main()

Linear Regression R² Score: 3577.530374378818
Random Forest R² Score: 2711.07845662271
Advanced analysis complete with:
1. Interactive 3D Customer Segmentation (advanced_customer_segmentation.html)
2. Product Sales Feature Importance (product_sales_feature_importance.png)
3. Sales Temporal Decomposition (sales_decomposition.png)


In [None]:
# Required installations for Google Colab environment
print("Installing required packages...")
# !pip install pyspark==3.4.1 sparknlp seaborn matplotlib pandas
print("Package installation complete.")

# Core imports
print("\nImporting required libraries...")
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, udf, lower, regexp_replace, explode
)
from pyspark.sql.types import (
    ArrayType, StringType, StructType, StructField,
    IntegerType, TimestampType, DoubleType
)
from pyspark.ml.feature import HashingTF, IDF, Tokenizer as MLTokenizer
from pyspark.ml.clustering import KMeans as MLKMeans
from pyspark.ml import Pipeline

# Spark NLP Imports
import sparknlp
from sparknlp.base import DocumentAssembler, LightPipeline
from sparknlp.annotator import (
    Tokenizer, Normalizer, StopWordsCleaner,
    WordEmbeddingsModel, SentenceDetector, NerDLModel
)

# Visualization Libraries
import matplotlib.pyplot as plt
import seaborn as sns
print("Library imports complete.")

class SparkNLPRetailAnalysis:
    """
    A comprehensive class for analyzing retail data using Spark NLP with detailed progress tracking.
    """

    def __init__(self, file_path: str):
        """Initialize the SparkNLPRetailAnalysis with progress tracking."""
        print("\nInitializing SparkNLPRetailAnalysis...")
        print("Starting Spark session with NLP support...")
        self.spark = sparknlp.start()
        print("Spark session started successfully.")

        print("\nConfiguring Spark settings for optimal performance...")
        self._configure_spark_session()
        print("Spark configuration complete.")

        print("\nLoading and preprocessing retail data...")
        self.df = self.load_retail_data(file_path)
        print(f"Data loaded successfully. Row count: {self.df.count()}")

        print("\nCreating NLP pipeline...")
        self.nlp_pipeline = self.create_nlp_pipeline()
        print("NLP pipeline created successfully.")

    def _configure_spark_session(self):
        """Configure Spark session with optimization settings."""
        configs = {
            "spark.sql.shuffle.partitions": "200",
            "spark.default.parallelism": "100",
            # "spark.executor.memory": "2g",
            # "spark.driver.memory": "2g"
        }
        for key, value in configs.items():
            print(f"Setting {key} to {value}")
            self.spark.conf.set(key, value)

    def load_retail_data(self, file_path: str):
        """Load and preprocess the retail dataset with progress tracking."""
        print("\nDefining schema for retail data...")
        schema = StructType([
            StructField("InvoiceNo", StringType(), True),
            StructField("StockCode", StringType(), True),
            StructField("Description", StringType(), True),
            StructField("Quantity", IntegerType(), True),
            StructField("InvoiceDate", TimestampType(), True),
            StructField("UnitPrice", DoubleType(), True),
            StructField("CustomerID", StringType(), True),
            StructField("Country", StringType(), True)
        ])
        print("Schema defined successfully.")

        print("\nLoading data from CSV...")
        df = (
            self.spark.read.format("csv")
            .option("header", "true")
            .schema(schema)
            .load(file_path)
        )
        print(f"Initial data load complete. Row count: {df.count()}")

        print("\nFiltering for non-null descriptions...")
        df = df.filter(col("Description").isNotNull())
        print(f"Filtered data row count: {df.count()}")

        print("\nCleaning description text...")
        df = df.withColumn(
            "CleanDescription",
            lower(regexp_replace(col("Description"), "[^a-zA-Z\\s]", ""))
        )
        print("Text cleaning complete.")

        print("\nRepartitioning dataset...")
        df = df.repartition(50)
        print("Data preparation complete.")

        return df

    def create_nlp_pipeline(self):
        """Create advanced NLP processing pipeline with progress tracking."""
        print("\nInitializing NLP pipeline components...")

        print("Setting up DocumentAssembler...")
        documentAssembler = DocumentAssembler() \
            .setInputCol("CleanDescription") \
            .setOutputCol("document")

        print("Setting up SentenceDetector...")
        sentenceDetector = SentenceDetector() \
            .setInputCols(["document"]) \
            .setOutputCol("sentence")

        print("Setting up Tokenizer...")
        tokenizer = Tokenizer() \
            .setInputCols(["sentence"]) \
            .setOutputCol("token")

        print("Setting up Normalizer...")
        normalizer = Normalizer() \
            .setInputCols(["token"]) \
            .setOutputCol("normalized")

        print("Setting up StopWordsCleaner...")
        stopWordsCleaner = StopWordsCleaner() \
            .setInputCols(["normalized"]) \
            .setOutputCol("cleanTokens")

        print("Loading word embeddings model...")
        embeddings = WordEmbeddingsModel.pretrained("glove_100d") \
            .setInputCols(["document", "token"]) \
            .setOutputCol("embeddings")
        print("Word embeddings loaded successfully.")

        print("Loading NER model...")
        nerModel = NerDLModel.pretrained("ner_dl", "en") \
            .setInputCols(["document", "token", "embeddings"]) \
            .setOutputCol("ner")
        print("NER model loaded successfully.")

        print("Assembling complete pipeline...")
        return sparknlp.base.Pipeline(stages=[
            documentAssembler,
            sentenceDetector,
            tokenizer,
            normalizer,
            stopWordsCleaner,
            embeddings,
            nerModel
        ])

    def perform_text_analysis(self):
        """Perform comprehensive text analysis with detailed progress tracking."""
        print("\nStarting text analysis process...")

        print("Fitting NLP pipeline to data...")
        nlp_model = self.nlp_pipeline.fit(self.df)
        print("Pipeline fitted successfully.")

        print("Transforming data through NLP pipeline...")
        processed_df = nlp_model.transform(self.df)
        print("NLP transformation complete.")

        print("\nDefining entity extraction UDF...")
        def extract_entities(ner_annotations):
            if not ner_annotations:
                return []
            return [
                (entity.result, entity.metadata['entity'])
                for entity in ner_annotations
                if 'entity' in entity.metadata
            ]

        extract_entities_udf = udf(
            extract_entities,
            ArrayType(
                StructType([
                    StructField("text", StringType(), True),
                    StructField("entity_type", StringType(), True)
                ])
            )
        )
        print("Entity extraction UDF defined.")

        print("\nExtracting named entities...")
        entity_df = processed_df.withColumn(
            "extracted_entities",
            extract_entities_udf(col("ner"))
        )
        print("Entity extraction complete.")

        print("\nAnalyzing entity distribution...")
        # entity_analysis = (
        #     entity_df.select(explode("extracted_entities").alias("entity"))
        #     .groupBy("entity.entity_type")
        #     .count()
        #     .orderBy(col("count").desc())
        #     .toPandas()
        # )
        exploded_entities = entity_df.select(explode("extracted_entities").alias("entity")).cache()
        spark_entity_analysis = (
            exploded_entities
            .select(col("entity.entity_type").alias("entity_type"))
            .groupBy("entity_type")
            .agg(expr("count(1)").alias("count"))
            .orderBy(col("count").desc())
        )

        # Convert to Pandas after reducing the data size
        entity_analysis = spark_entity_analysis.limit(50).toPandas()

        print("Entity distribution analysis complete.")

        print("\nGenerating entity distribution visualization...")
        plt.figure(figsize=(12, 6))
        sns.barplot(x='entity_type', y='count', data=entity_analysis)
        plt.title('Distribution of Named Entities in Product Descriptions')
        plt.xticks(rotation=45)
        plt.tight_layout()
        plt.savefig('named_entities_distribution.png')
        plt.close()
        print("Entity distribution visualization saved.")

        print("\nAnalyzing word frequencies...")
        word_freq = (
            processed_df.select(explode("cleanTokens.result").alias("word"))
            .groupBy("word")
            .count()
            .orderBy(col("count").desc())
            .limit(50)
            .toPandas()
        )
        print("Word frequency analysis complete.")

        print("\nGenerating word frequency visualization...")
        plt.figure(figsize=(15, 6))
        sns.barplot(x='word', y='count', data=word_freq.head(20))
        plt.title('Top 20 Most Frequent Words in Product Descriptions')
        plt.xticks(rotation=90)
        plt.tight_layout()
        plt.savefig('word_frequency.png')
        plt.close()
        print("Word frequency visualization saved.")

        print("\nSetting up text clustering pipeline...")
        tokenizer = MLTokenizer(inputCol="CleanDescription", outputCol="words")
        hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=5000)
        idf = IDF(inputCol="rawFeatures", outputCol="features")
        kmeans = MLKMeans(k=5, featuresCol="features")

        clustering_pipeline = Pipeline(stages=[tokenizer, hashingTF, idf, kmeans])
        print("Clustering pipeline created.")

        print("\nPerforming text clustering...")
        model = clustering_pipeline.fit(self.df)
        clustered_df = model.transform(self.df)
        print("Text clustering complete.")

        print("\nText Analysis Process Complete!")
        print("Generated outputs:")
        print("1. named_entities_distribution.png")
        print("2. word_frequency.png")

        return clustered_df

def main():
    """Main execution function with progress tracking."""
    print("\n" + "="*50)
    print("Starting Retail Analysis Process")
    print("="*50)

    # Replace with your CSV file path
    file_path = "/content/Online Retail.csv"

    print("\nInitializing analysis...")
    nlp_analysis = SparkNLPRetailAnalysis(file_path)

    print("\nStarting text analysis...")
    nlp_analysis.perform_text_analysis()

    print("\n" + "="*50)
    print("Analysis Complete!")
    print("="*50)

if __name__ == "__main__":
    main()

Installing required packages...
Package installation complete.

Importing required libraries...
Library imports complete.

Starting Retail Analysis Process

Initializing analysis...

Initializing SparkNLPRetailAnalysis...
Starting Spark session with NLP support...
Spark session started successfully.

Configuring Spark settings for optimal performance...
Setting spark.sql.shuffle.partitions to 200
Setting spark.default.parallelism to 100
Spark configuration complete.

Loading and preprocessing retail data...

Defining schema for retail data...
Schema defined successfully.

Loading data from CSV...


In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
import plotly.graph_objs as go
from plotly.subplots import make_subplots

class RetailDataVisualizer:
    def __init__(self, file_path):
        """
        Initialize visualizer with retail dataset
        """
        # Read the dataset
        self.df = pd.read_csv(file_path, parse_dates=['InvoiceDate'], encoding='unicode_escape')

        # Clean and preprocess data
        self.preprocess_data()

        # Set up color palette
        self.color_palette = px.colors.sequential.Viridis

    def preprocess_data(self):
        """
        Clean and prepare the dataset for visualization
        """
        # Remove negative quantities and zero prices
        self.df = self.df[(self.df['Quantity'] > 0) & (self.df['UnitPrice'] > 0)]

        # Calculate total sales
        self.df['TotalSales'] = self.df['Quantity'] * self.df['UnitPrice']

        # Convert InvoiceDate to datetime if not already
        self.df['InvoiceDate'] = pd.to_datetime(self.df['InvoiceDate'])

        # Extract additional time features
        self.df['Year'] = self.df['InvoiceDate'].dt.year
        self.df['Month'] = self.df['InvoiceDate'].dt.month
        self.df['DayOfWeek'] = self.df['InvoiceDate'].dt.day_name()

    def generate_advanced_visualizations(self):
        """
        Create comprehensive visualizations from the retail dataset
        """
        # 1. Geographical Sales Distribution
        country_sales = self.df.groupby('Country')['TotalSales'].sum().reset_index()

        # Plotly Geographical Heatmap
        fig_geo = px.scatter_geo(
            country_sales,
            locations='Country',
            locationmode='country names',
            color='TotalSales',
            size='TotalSales',
            hover_name='Country',
            color_continuous_scale=self.color_palette,
            projection='natural earth',
            title='Global Sales Distribution by Country'
        )
        fig_geo.write_html('global_sales_distribution.html')

        # 2. Time Series Analysis of Monthly Sales
        monthly_sales = self.df.groupby([self.df['InvoiceDate'].dt.to_period('M')])['TotalSales'].sum().reset_index()
        monthly_sales['InvoiceDate'] = monthly_sales['InvoiceDate'].dt.to_timestamp()

        # Interactive Time Series Plot
        fig_time = go.Figure()
        fig_time.add_trace(go.Scatter(
            x=monthly_sales['InvoiceDate'],
            y=monthly_sales['TotalSales'],
            mode='lines+markers',
            name='Monthly Sales',
            line=dict(color=self.color_palette[3], width=3)
        ))

        fig_time.update_layout(
            title='Monthly Sales Trend',
            xaxis_title='Date',
            yaxis_title='Total Sales',
            hovermode='x unified'
        )
        fig_time.write_html('monthly_sales_trend.html')

        # 3. Product Category Analysis
        # Use Description as a proxy for product category
        product_sales = self.df.groupby('Description')['TotalSales'].agg(['sum', 'count']).reset_index()
        product_sales = product_sales.sort_values('sum', ascending=False).head(15)

        # Plotly Bar Chart
        fig_products = px.bar(
            product_sales,
            x='Description',
            y='sum',
            color='count',
            title='Top 15 Products by Total Sales',
            labels={'sum': 'Total Sales', 'count': 'Transaction Count'},
            color_continuous_scale=self.color_palette
        )
        fig_products.update_xaxes(tickangle=45)
        fig_products.write_html('top_products_sales.html')

        # 4. Customer Segmentation Visualization
        # Recency, Frequency, Monetary (RFM) Analysis
        from datetime import datetime

        # Calculate RFM metrics
        rfm_data = self.df.groupby('CustomerID').agg({
            'InvoiceDate': lambda x: (datetime.now() - x.max()).days,  # Recency
            'InvoiceNo': 'count',  # Frequency
            'TotalSales': 'sum'  # Monetary
        }).reset_index()

        rfm_data.columns = ['CustomerID', 'Recency', 'Frequency', 'Monetary']

        # 3D Scatter for Customer Segmentation
        fig_customers = px.scatter_3d(
            rfm_data,
            x='Recency',
            y='Frequency',
            z='Monetary',
            color='Monetary',
            size='Monetary',
            hover_data=['CustomerID'],
            title='Customer Segmentation: Recency, Frequency, Monetary',
            color_continuous_scale=self.color_palette
        )
        fig_customers.write_html('customer_segmentation_3d.html')

        # 5. Correlation Heatmap of Sales Metrics
        # Create correlation matrix of derived features
        correlation_df = rfm_data[['Recency', 'Frequency', 'Monetary']].corr()

        plt.figure(figsize=(10, 8))
        sns.heatmap(
            correlation_df,
            annot=True,
            cmap='coolwarm',
            center=0,
            vmin=-1,
            vmax=1,
            square=True
        )
        plt.title('Correlation of Customer Metrics')
        plt.tight_layout()
        plt.savefig('customer_metrics_correlation.png', dpi=300)
        plt.close()

        print("Advanced Retail Data Visualizations Generated:")
        print("1. global_sales_distribution.html")
        print("2. monthly_sales_trend.html")
        print("3. top_products_sales.html")
        print("4. customer_segmentation_3d.html")
        print("5. customer_metrics_correlation.png")

def main():
    # Replace with your actual file path
    file_path = "Online Retail.csv"

    visualizer = RetailDataVisualizer(file_path)
    visualizer.generate_advanced_visualizations()

if __name__ == "__main__":
    main()

Advanced Retail Data Visualizations Generated:
1. global_sales_distribution.html
2. monthly_sales_trend.html
3. top_products_sales.html
4. customer_segmentation_3d.html
5. customer_metrics_correlation.png


In [None]:
import pandas as pd
import plotly.express as px
import numpy as np

# Read the UCI Online Retail dataset
def load_and_preprocess_data(filepath):
    """
    Load and preprocess the UCI Online Retail dataset for geographical visualization

    Parameters:
    filepath (str): Path to the Excel file

    Returns:
    pd.DataFrame: Aggregated sales data by country
    """
    # Read the dataset
    df = pd.read_csv(filepath)

    # Basic cleaning
    df['TotalSales'] = df['Quantity'] * df['UnitPrice']

    # Group by country and aggregate sales
    country_sales = df.groupby('Country')['TotalSales'].sum().reset_index()

    # Log transform sales (adding 1 to handle zero values)
    country_sales['LogSales'] = np.log10(country_sales['TotalSales'] + 1)

    return country_sales

# Create geographical plot
def create_geo_plot(country_sales):
    """
    Create a geographical plot of sales data

    Parameters:
    country_sales (pd.DataFrame): Dataframe with country sales data

    Returns:
    Plotly Figure object
    """
    fig = px.choropleth(
        country_sales,
        locations='Country',
        locationmode='country names',
        color='LogSales',
        hover_name='Country',
        hover_data=['TotalSales'],
        color_continuous_scale='Viridis',
        scope='europe',
        title='European Online Retail Sales (Log Scaled)'
    )

    # Customize the layout
    fig.update_geos(
        showframe=False,
        showcoastlines=True,
        projection_type='mercator'
    )

    return fig

# Main execution
def main(filepath):
    """
    Main function to load data and create visualization

    Parameters:
    filepath (str): Path to the UCI Online Retail dataset
    """
    # Load and preprocess data
    country_sales = load_and_preprocess_data(filepath)

    # Create and show the plot
    fig = create_geo_plot(country_sales)
    fig.show()
    fig.write_html('european_retail_sales.html')

# Example usage
# Assuming you have the UCI Online Retail dataset
main('Online Retail.csv')

# Note: You'll need to have these libraries installed:
# pip install pandas plotly openpyxl numpy

In [None]:
import pandas as pd
import plotly.express as px
import plotly.graph_objs as go
import numpy as np

def load_and_preprocess_data(filepath):
    """
    Comprehensive data loading and preprocessing
    """
    # Read the dataset
    df = pd.read_csv(filepath)

    # Clean and transform data
    df['InvoiceDate'] = pd.to_datetime(df['InvoiceDate'])
    df['TotalSales'] = df['Quantity'] * df['UnitPrice']

    return df

def create_enhanced_sales_intensity_heatmap(df):
    """
    Create a more detailed sales intensity heatmap with refined time buckets
    """
    # Create more granular time buckets
    def create_time_bucket(hour):
        if 0 <= hour < 3:
            return '00:00-03:00 (Late Night)'
        elif 3 <= hour < 6:
            return '03:00-06:00 (Early Morning)'
        elif 6 <= hour < 9:
            return '06:00-09:00 (Morning)'
        elif 9 <= hour < 12:
            return '09:00-12:00 (Late Morning)'
        elif 12 <= hour < 15:
            return '12:00-15:00 (Early Afternoon)'
        elif 15 <= hour < 18:
            return '15:00-18:00 (Late Afternoon)'
        elif 18 <= hour < 21:
            return '18:00-21:00 (Evening)'
        else:
            return '21:00-00:00 (Night)'

    # Extract hour and prepare data
    df['Hour'] = df['InvoiceDate'].dt.hour
    df['TimeBucket'] = df['Hour'].apply(create_time_bucket)
    df['DayOfWeek'] = df['InvoiceDate'].dt.day_name()

    # Define day order
    day_order = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']

    # Time bucket order
    time_bucket_order = [
        '00:00-03:00 (Late Night)',
        '03:00-06:00 (Early Morning)',
        '06:00-09:00 (Morning)',
        '09:00-12:00 (Late Morning)',
        '12:00-15:00 (Early Afternoon)',
        '15:00-18:00 (Late Afternoon)',
        '18:00-21:00 (Evening)',
        '21:00-00:00 (Night)'
    ]

    # Aggregate sales by day and time bucket
    hourly_sales = df.groupby(['DayOfWeek', 'TimeBucket'])['TotalSales'].agg([
        'sum',      # Total sales
        'count',    # Number of transactions
        'mean'      # Average transaction value
    ]).reset_index()

    # Prepare categorical data
    hourly_sales['DayOfWeek'] = pd.Categorical(hourly_sales['DayOfWeek'], categories=day_order, ordered=True)
    hourly_sales['TimeBucket'] = pd.Categorical(hourly_sales['TimeBucket'], categories=time_bucket_order, ordered=True)
    hourly_sales = hourly_sales.sort_values(['DayOfWeek', 'TimeBucket'])

    # Create multiple visualizations

    # 1. Sales Total Heatmap
    fig_total = px.density_heatmap(
        hourly_sales,
        x='TimeBucket',
        y='DayOfWeek',
        z='sum',
        title='Sales Intensity Heatmap: Total Sales',
        labels={'sum': 'Total Sales (£)'},
        color_continuous_scale='magma',
        height=600,
        width=1000
    )
    fig_total.update_xaxes(tickangle=45)
    fig_total.write_html('sales_intensity_total.html')

    # 2. Transaction Count Heatmap
    fig_count = px.density_heatmap(
        hourly_sales,
        x='TimeBucket',
        y='DayOfWeek',
        z='count',
        title='Sales Intensity Heatmap: Number of Transactions',
        labels={'count': 'Number of Transactions'},
        color_continuous_scale='viridis',
        height=600,
        width=1000
    )
    fig_count.update_xaxes(tickangle=45)
    fig_count.write_html('sales_intensity_count.html')

    # 3. Average Transaction Value Heatmap
    fig_avg = px.density_heatmap(
        hourly_sales,
        x='TimeBucket',
        y='DayOfWeek',
        z='mean',
        title='Sales Intensity Heatmap: Average Transaction Value',
        labels={'mean': 'Average Transaction Value (£)'},
        color_continuous_scale='plasma',
        height=600,
        width=1000
    )
    fig_avg.update_xaxes(tickangle=45)
    fig_avg.write_html('sales_intensity_avg.html')

    # 4. Interactive Bubble Plot for Comprehensive View
    fig_bubble = go.Figure(data=[go.Scatter(
        x=hourly_sales['TimeBucket'],
        y=hourly_sales['DayOfWeek'],
        mode='markers',
        marker=dict(
            size=hourly_sales['count'] / 50,  # Size based on transaction count
            color=hourly_sales['sum'],  # Color based on total sales
            colorscale='Viridis',
            showscale=True,
            colorbar=dict(title='Total Sales (£)')
        ),
        text=hourly_sales.apply(lambda row:
            f"Day: {row['DayOfWeek']}<br>" +
            f"Time: {row['TimeBucket']}<br>" +
            f"Total Sales: £{row['sum']:,.2f}<br>" +
            f"Transactions: {row['count']}<br>" +
            f"Avg Transaction: £{row['mean']:,.2f}", axis=1),
        hoverinfo='text'
    )])

    fig_bubble.update_layout(
        title='Comprehensive Sales Intensity Visualization',
        xaxis_title='Time Bucket',
        yaxis_title='Day of Week',
        height=600,
        width=1000
    )
    fig_bubble.update_xaxes(tickangle=45)
    fig_bubble.write_html('sales_intensity_bubble.html')

    # Print summary statistics
    print("Sales Intensity Analysis Summary:")
    print(hourly_sales.groupby('TimeBucket')[['sum', 'count', 'mean']].sum())

# Main execution
def main(filepath):
    """
    Main function to load data and create visualizations
    """
    # Load data
    df = load_and_preprocess_data(filepath)

    # Create enhanced sales intensity visualization
    create_enhanced_sales_intensity_heatmap(df)

# Example usage
main('Online Retail.csv')

# Note: Requires these libraries
# pip install pandas plotly openpyxl numpy

Sales Intensity Analysis Summary:
                                       sum   count        mean
TimeBucket                                                    
00:00-03:00 (Late Night)             0.000       0    0.000000
03:00-06:00 (Early Morning)          0.000       0    0.000000
06:00-09:00 (Morning)           310615.240    9216  167.821549
09:00-12:00 (Late Morning)     2860310.702  111664  150.577283
12:00-15:00 (Early Afternoon)  3408371.021  193479  106.712981
15:00-18:00 (Late Afternoon)   1559325.481   84951  108.648023
18:00-21:00 (Evening)           161443.370    7519  484.786041
21:00-00:00 (Night)                  0.000       0    0.000000






In [None]:
import pandas as pd
import numpy as np
import plotly.express as px
import plotly.graph_objs as go
import networkx as nx
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn.cluster import DBSCAN
from tqdm import tqdm
import logging
import time
from typing import Dict, List, Optional
from scipy.sparse import csr_matrix

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s: %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)

class RetailDataAnalyzer:
    def __init__(self, filepath: str):
        """
        Initialize the analyzer with performance tracking and logging.

        :param filepath: Path to the retail dataset CSV
        """
        self.start_time = time.time()
        self.filepath = filepath
        self.df = None
        logger.info(f"Retail Data Analyzer initialized with file: {filepath}")

    def load_and_preprocess_data(self) -> pd.DataFrame:
        """
        Enhanced data loading with comprehensive preprocessing and performance logging.

        :return: Preprocessed pandas DataFrame
        """
        logger.info("Starting data loading and preprocessing...")
        preprocessing_start = time.time()

        # Read with optimized parameters
        df = pd.read_csv(
            self.filepath,
            parse_dates=['InvoiceDate'],  # Built-in datetime parsing
            infer_datetime_format=True,   # Faster datetime inference
            low_memory=False              # Handle mixed data types
        )

        # Vectorized operations for efficiency
        df['TotalSales'] = df['Quantity'] * df['UnitPrice']

        # Efficient datetime extraction
        df['Year'] = df['InvoiceDate'].dt.year
        df['Month'] = df['InvoiceDate'].dt.month
        df['DayOfWeek'] = df['InvoiceDate'].dt.day_name()

        # Remove potential duplicates and invalid entries
        df.drop_duplicates(inplace=True)
        df.dropna(subset=['Description', 'Quantity', 'UnitPrice'], inplace=True)

        self.df = df
        preprocessing_time = time.time() - preprocessing_start
        logger.info(f"Data preprocessing completed in {preprocessing_time:.2f} seconds")

        return df

    def create_parallel_coordinates(self) -> None:
        """Create parallel coordinates visualization with performance optimization."""
        logger.info("Generating Parallel Coordinates Visualization...")

        product_performance = (
            self.df.groupby('Description')
            .agg({
                'Quantity': ['sum', 'mean'],
                'UnitPrice': ['mean', 'max'],
                'TotalSales': ['sum', 'mean']
            })
            .reset_index()
        )

        # Flatten multi-level columns efficiently
        product_performance.columns = [
            'Description', 'Total_Quantity', 'Avg_Quantity',
            'Avg_Price', 'Max_Price', 'Total_Sales', 'Avg_Sales'
        ]

        # Normalize with sklearn
        scaler = MinMaxScaler()
        columns_to_normalize = [
            'Total_Quantity', 'Avg_Quantity', 'Avg_Price',
            'Max_Price', 'Total_Sales', 'Avg_Sales'
        ]
        product_performance[columns_to_normalize] = scaler.fit_transform(
            product_performance[columns_to_normalize]
        )

        # Use Plotly's optimized rendering
        fig = go.Figure(data=
            go.Parcoords(
                line=dict(
                    color=product_performance['Total_Sales'],
                    colorscale='Viridis',
                    showscale=True
                ),
                dimensions=[
                    dict(range=[0, 1], label=col, values=product_performance[col])
                    for col in columns_to_normalize
                ]
            )
        )

        fig.update_layout(
            title='Optimized Product Performance Analysis',
            height=800, width=1200
        )
        fig.write_html('parallel_coordinates_performance.html')
        logger.info("Parallel Coordinates Visualization complete.")

    def create_product_network(self) -> None:
        """Create an improved product co-purchase network graph."""
        logger.info("Generating Improved Product Network Visualization...")

        # Efficient pivot and co-occurrence calculation
        product_invoice = self.df.pivot_table(
            index='InvoiceNo',
            columns='Description',
            aggfunc='size',
            fill_value=0
        )

        sparse_matrix = csr_matrix(product_invoice.values)

        # Perform dot product on sparse matrix
        product_pairs = sparse_matrix.T.dot(sparse_matrix)

        # Convert back to DataFrame
        product_pairs = pd.DataFrame(product_pairs.toarray(),
                                    index=product_invoice.columns,
                                    columns=product_invoice.columns)
        np.fill_diagonal(product_pairs.values, 0)

        # Extract top co-purchase relationships
        product_pairs.index.name = 'Product_A'
        product_pairs.columns.name = None
        co_purchase_counts = (
            product_pairs.stack()
            .reset_index(name='Frequency')
            .query("Frequency > 0")
        )
        co_purchase_counts.columns = ['Product_A', 'Product_B', 'Frequency']
        top_co_purchases = co_purchase_counts.nlargest(100, 'Frequency')

        # Build the NetworkX graph
        G = nx.from_pandas_edgelist(
            top_co_purchases,
            'Product_A',
            'Product_B',
            edge_attr='Frequency'
        )

        # Calculate node centrality to determine size
        node_centrality = nx.degree_centrality(G)
        node_sizes = [10 + node_centrality[node] * 100 for node in G.nodes()]

        # Generate positions with force-directed layout
        pos = nx.spring_layout(G, k=0.5, seed=42)

        # Create the Plotly figure
        fig = go.Figure()

        # Add edges
        for edge in G.edges(data=True):
            x0, y0 = pos[edge[0]]
            x1, y1 = pos[edge[1]]
            fig.add_trace(go.Scatter(
                x=[x0, x1, None],
                y=[y0, y1, None],
                mode='lines',
                line=dict(width=edge[2]['Frequency']/100, color='gray'),
                hoverinfo='none'
            ))

        # Add nodes
        for i, node in enumerate(G.nodes(data=False)):  # Use enumerate to get the index
            x, y = pos[node]
            fig.add_trace(go.Scatter(
                x=[x],
                y=[y],
                mode='markers+text',
                marker=dict(size=node_sizes[i], color='blue', opacity=0.8), # Use i as the index
                text=node,
                textposition='top center',
                hoverinfo='text',
                name=node
            ))

        # Update layout for better aesthetics
        fig.update_layout(
            title="Improved Product Co-purchase Network",
            title_x=0.5,
            showlegend=False,
            xaxis=dict(showgrid=False, zeroline=False),
            yaxis=dict(showgrid=False, zeroline=False),
            height=800,
            width=1200
        )

        # Save the plot to an HTML file
        fig.write_html('improved_product_network.html')
        logger.info("Improved Product Network Visualization complete.")


    def execute_analysis(self) -> None:
        """
        Execute the full analysis pipeline with performance tracking.
        """
        logger.info("Starting comprehensive retail data analysis...")

        # Load and preprocess data
        self.load_and_preprocess_data()

        # Generate visualizations
        visualization_methods = [
            self.create_parallel_coordinates,
            self.create_product_network
        ]

        for method in visualization_methods:
            method_start = time.time()
            method()
            logger.info(f"{method.__name__} completed in {time.time() - method_start:.2f} seconds")

        total_time = time.time() - self.start_time
        logger.info(f"Total analysis completed in {total_time:.2f} seconds")

def main():
    """Main execution function."""
    analyzer = RetailDataAnalyzer('Online Retail.csv')
    analyzer.execute_analysis()

if __name__ == "__main__":
    main()

# Performance dependencies:
# pip install pandas plotly networkx scikit-learn numpy tqdm


The argument 'infer_datetime_format' is deprecated and will be removed in a future version. A strict version of it is now the default, see https://pandas.pydata.org/pdeps/0004-consistent-to-datetime-parsing.html. You can safely remove this argument.



In [None]:
import pandas as pd
import numpy as np
import plotly.express as px
import plotly.graph_objs as go
import networkx as nx
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn.cluster import DBSCAN
from tqdm import tqdm
import logging
import time
from typing import Dict, List, Optional
from scipy.sparse import csr_matrix
from scipy import stats

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s: %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)

class RetailDataAnalyzer:
    def __init__(self, filepath: str):
        """
        Initialize the analyzer with performance tracking and logging.

        :param filepath: Path to the retail dataset CSV
        """
        self.start_time = time.time()
        self.filepath = filepath
        self.df = None
        logger.info(f"Retail Data Analyzer initialized with file: {filepath}")

    def load_and_preprocess_data(self) -> pd.DataFrame:
        """
        Enhanced data loading with comprehensive preprocessing and performance logging.

        :return: Preprocessed pandas DataFrame
        """
        logger.info("Starting data loading and preprocessing...")
        preprocessing_start = time.time()

        # Read with optimized parameters
        df = pd.read_csv(
            self.filepath,
            parse_dates=['InvoiceDate'],  # Built-in datetime parsing
            infer_datetime_format=True,   # Faster datetime inference
            low_memory=False              # Handle mixed data types
        )

        # Vectorized operations for efficiency
        df['TotalSales'] = df['Quantity'] * df['UnitPrice']

        # Efficient datetime extraction
        df['Year'] = df['InvoiceDate'].dt.year
        df['Month'] = df['InvoiceDate'].dt.month
        df['DayOfWeek'] = df['InvoiceDate'].dt.day_name()

        # Remove potential duplicates and invalid entries
        df.drop_duplicates(inplace=True)
        df.dropna(subset=['Description', 'Quantity', 'UnitPrice'], inplace=True)

        self.df = df
        preprocessing_time = time.time() - preprocessing_start
        logger.info(f"Data preprocessing completed in {preprocessing_time:.2f} seconds")

        return df

    def create_parallel_coordinates(self) -> None:
        """Create parallel coordinates visualization with performance optimization."""
        logger.info("Generating Parallel Coordinates Visualization...")

        product_performance = (
            self.df.groupby('Description')
            .agg({
                'Quantity': ['sum', 'mean'],
                'UnitPrice': ['mean', 'max'],
                'TotalSales': ['sum', 'mean']
            })
            .reset_index()
        )

        # Flatten multi-level columns efficiently
        product_performance.columns = [
            'Description', 'Total_Quantity', 'Avg_Quantity',
            'Avg_Price', 'Max_Price', 'Total_Sales', 'Avg_Sales'
        ]

        # Normalize with sklearn
        scaler = MinMaxScaler()
        columns_to_normalize = [
            'Total_Quantity', 'Avg_Quantity', 'Avg_Price',
            'Max_Price', 'Total_Sales', 'Avg_Sales'
        ]
        product_performance[columns_to_normalize] = scaler.fit_transform(
            product_performance[columns_to_normalize]
        )

        # Use Plotly's optimized rendering
        fig = go.Figure(data=
            go.Parcoords(
                line=dict(
                    color=product_performance['Total_Sales'],
                    colorscale='Viridis',
                    showscale=True
                ),
                dimensions=[
                    dict(range=[0, 1], label=col, values=product_performance[col])
                    for col in columns_to_normalize
                ]
            )
        )

        fig.update_layout(
            title='Optimized Product Performance Analysis',
            height=800, width=1200
        )
        fig.write_html('parallel_coordinates_performance.html')
        logger.info("Parallel Coordinates Visualization complete.")

    def create_product_network(self) -> None:
        """Create an improved product co-purchase network graph."""
        logger.info("Generating Improved Product Network Visualization...")

        # Efficient pivot and co-occurrence calculation
        product_invoice = self.df.pivot_table(
            index='InvoiceNo',
            columns='Description',
            aggfunc='size',
            fill_value=0
        )

        sparse_matrix = csr_matrix(product_invoice.values)

        # Perform dot product on sparse matrix
        product_pairs = sparse_matrix.T.dot(sparse_matrix)

        # Convert back to DataFrame
        product_pairs = pd.DataFrame(product_pairs.toarray(),
                                    index=product_invoice.columns,
                                    columns=product_invoice.columns)
        np.fill_diagonal(product_pairs.values, 0)

        # Extract top co-purchase relationships
        product_pairs.index.name = 'Product_A'
        product_pairs.columns.name = None
        co_purchase_counts = (
            product_pairs.stack()
            .reset_index(name='Frequency')
            .query("Frequency > 0")
        )
        co_purchase_counts.columns = ['Product_A', 'Product_B', 'Frequency']
        top_co_purchases = co_purchase_counts.nlargest(100, 'Frequency')

        # Build the NetworkX graph
        G = nx.from_pandas_edgelist(
            top_co_purchases,
            'Product_A',
            'Product_B',
            edge_attr='Frequency'
        )

        # Calculate node centrality to determine size
        node_centrality = nx.degree_centrality(G)
        node_sizes = [10 + node_centrality[node] * 100 for node in G.nodes()]

        # Generate positions with force-directed layout
        pos = nx.spring_layout(G, k=0.5, seed=42)

        # Create the Plotly figure
        fig = go.Figure()

        # Add edges
        for edge in G.edges(data=True):
            x0, y0 = pos[edge[0]]
            x1, y1 = pos[edge[1]]
            fig.add_trace(go.Scatter(
                x=[x0, x1, None],
                y=[y0, y1, None],
                mode='lines',
                line=dict(width=edge[2]['Frequency']/100, color='gray'),
                hoverinfo='none'
            ))

        # Add nodes
        for i, node in enumerate(G.nodes(data=False)):  # Use enumerate to get the index
            x, y = pos[node]
            fig.add_trace(go.Scatter(
                x=[x],
                y=[y],
                mode='markers+text',
                marker=dict(size=node_sizes[i], color='blue', opacity=0.8), # Use i as the index
                text=node,
                textposition='top center',
                hoverinfo='text',
                name=node
            ))

        # Update layout for better aesthetics
        fig.update_layout(
            title="Improved Product Co-purchase Network",
            title_x=0.5,
            showlegend=False,
            xaxis=dict(showgrid=False, zeroline=False),
            yaxis=dict(showgrid=False, zeroline=False),
            height=800,
            width=1200
        )

        # Save the plot to an HTML file
        fig.write_html('improved_product_network.html')
        logger.info("Improved Product Network Visualization complete.")

    def generate_advanced_3d_visualizations(self):
        """
        Create advanced 3D visualizations to provide deep insights into retail data.
        """
        logger.info("Generating Advanced 3D Visualizations...")

        # 1. 3D Scatter Plot of Sales Performance with Depth and Color Encoding
        def create_3d_sales_performance_scatter():
            """
            Create a 3D scatter plot that visualizes product performance across multiple dimensions.
            """
            # Group by product and calculate comprehensive metrics
            product_stats = (
                self.df.groupby('Description')
                .agg({
                    'Quantity': ['sum', 'mean'],
                    'UnitPrice': ['mean', 'max'],
                    'TotalSales': ['sum', 'mean']
                })
                .reset_index()
            )

            # Flatten multi-level columns
            product_stats.columns = [
                'Description', 'Total_Quantity', 'Avg_Quantity',
                'Avg_Price', 'Max_Price', 'Total_Sales', 'Avg_Sales'
            ]

            # Create 3D scatter plot
            fig = go.Figure(data=[go.Scatter3d(
                x=product_stats['Total_Quantity'],
                y=product_stats['Avg_Price'],
                z=product_stats['Total_Sales'],
                mode='markers',
                marker=dict(
                    size=5,
                    color=product_stats['Avg_Sales'],  # color by average sales
                    colorscale='Viridis',
                    opacity=0.8,
                    colorbar=dict(title='Avg Sales')
                ),
                text=product_stats['Description'],
                hoverinfo='text'
            )])

            fig.update_layout(
                title='3D Product Performance Visualization',
                scene=dict(
                    xaxis_title='Total Quantity',
                    yaxis_title='Average Price',
                    zaxis_title='Total Sales'
                ),
                height=800,
                width=1200
            )

            fig.write_html('3d_sales_performance_scatter.html')
            logger.info("3D Sales Performance Scatter Plot complete.")

        # 2. 3D Density Estimation of Purchase Patterns
        def create_3d_purchase_density():
            """
            Create a 3D density plot to visualize the distribution of purchases.
            """
            # Prepare data for kernel density estimation
            daily_sales = (
                self.df.groupby(['Year', 'Month', 'DayOfWeek'])['TotalSales']
                .sum()
                .reset_index()
            )

            # Map day names to numeric values for 3D plotting
            day_map = {day: idx for idx, day in enumerate(['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday'])}
            daily_sales['DayNumeric'] = daily_sales['DayOfWeek'].map(day_map)

            # Kernel Density Estimation
            x = daily_sales['Month']
            y = daily_sales['DayNumeric']
            z = daily_sales['TotalSales']

            # Create a grid of points
            xi = np.linspace(x.min(), x.max(), 100)
            yi = np.linspace(y.min(), y.max(), 100)
            xi, yi = np.meshgrid(xi, yi)

            # Perform kernel density estimation
            kernel = stats.gaussian_kde(np.vstack([x, y, z]))
            zi = kernel(np.vstack([xi.ravel(), yi.ravel(), z.mean() * np.ones_like(xi.ravel())]))

            # Create 3D surface plot
            fig = go.Figure(data=[
                go.Surface(
                    x=xi,
                    y=yi,
                    z=zi.reshape(xi.shape),
                    colorscale='Plasma',
                    colorbar=dict(title='Sales Density')
                )
            ])

            fig.update_layout(
                title='3D Sales Density Estimation',
                scene=dict(
                    xaxis_title='Month',
                    yaxis_title='Day of Week',
                    zaxis_title='Sales Density'
                ),
                height=800,
                width=1200
            )

            fig.write_html('3d_purchase_density.html')
            logger.info("3D Purchase Density Plot complete.")

        # 3. 3D Surface Plot of Seasonal Sales Trends
        def create_seasonal_sales_surface():
            """
            Generate a 3D surface plot showing seasonal sales trends across different product categories.
            """
            # Aggregate sales by month, year, and product category
            seasonal_sales = (
                self.df.groupby([
                    pd.Grouper(key='InvoiceDate', freq='M'),
                    'Description'
                ])['TotalSales']
                .sum()
                .unstack(fill_value=0)
            )

            # Select top categories
            top_categories = seasonal_sales.sum().nlargest(10).index
            seasonal_subset = seasonal_sales[top_categories]

            # Prepare data for surface plot
            months = seasonal_subset.index.month
            years = seasonal_subset.index.year

            # Create mesh grid
            unique_months = np.unique(months)
            unique_years = np.unique(years)

            # Initialize surface data
            surface_data = np.zeros((len(top_categories), len(unique_years), len(unique_months)))

            for i, category in enumerate(top_categories):
                for j, year in enumerate(unique_years):
                    for k, month in enumerate(unique_months):
                        mask = (months == month) & (years == year)
                        surface_data[i, j, k] = seasonal_subset.loc[mask, category].sum()

            # Create 3D surface plot
            fig = go.Figure()
            for i, category in enumerate(top_categories):
                fig.add_trace(go.Surface(
                    z=surface_data[i],
                    x=unique_years,
                    y=unique_months,
                    name=category,
                    colorscale='Jet',
                    showscale=False
                ))

            fig.update_layout(
                title='3D Seasonal Sales Trends by Top Categories',
                scene=dict(
                    xaxis_title='Year',
                    yaxis_title='Month',
                    zaxis_title='Total Sales'
                ),
                height=800,
                width=1200
            )

            fig.write_html('3d_seasonal_sales_surface.html')
            logger.info("3D Seasonal Sales Surface Plot complete.")

        # Execute all 3D visualization methods
        visualization_methods = [
            create_3d_sales_performance_scatter,
            create_3d_purchase_density,
            create_seasonal_sales_surface
        ]

        for method in visualization_methods:
            method_start = time.time()
            method()
            logger.info(f"{method.__name__} completed in {time.time() - method_start:.2f} seconds")

    def execute_analysis(self) -> None:
        """
        Execute the full analysis pipeline with performance tracking.
        """
        logger.info("Starting comprehensive retail data analysis...")

        # Load and preprocess data
        self.load_and_preprocess_data()

        # Generate visualizations
        visualization_methods = [
            self.create_parallel_coordinates,
            self.create_product_network,
            self.generate_advanced_3d_visualizations
        ]

        for method in visualization_methods:
            method_start = time.time()
            method()
            logger.info(f"{method.__name__} completed in {time.time() - method_start:.2f} seconds")

# analyzer = RetailDataAnalyzer('Online Retail.csv')
# analyzer.execute_analysis()