In [17]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
from sklearn.linear_model import LinearRegression
from sklearn.cluster import KMeans
from sklearn.tree import DecisionTreeRegressor
from sklearn.metrics import mean_squared_error, r2_score, silhouette_score
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, StandardScaler
from io import BytesIO
import matplotlib.backends.backend_pdf as pdf_backend
import unittest
from data_processor import DataProcessor
df=pd.read_excel('C:/Users/kalya/Documents/PycharmProjects/FeatureEngineering/olympics2024.xlsx')
import warnings
warnings.filterwarnings("ignore", category=Warning)

In [18]:
df.head()

Unnamed: 0,Rank,Country,Country Code,Gold,Silver,Bronze,Total
0,1,United States,US,40,44,42,126
1,2,China,CHN,40,27,24,91
2,3,Japan,JPN,20,12,13,45
3,4,Australia,AUS,18,19,16,53
4,5,France,FRA,16,26,22,64


In [20]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 91 entries, 0 to 90
Data columns (total 7 columns):
 #   Column        Non-Null Count  Dtype 
---  ------        --------------  ----- 
 0   Rank          91 non-null     int64 
 1   Country       91 non-null     object
 2   Country Code  91 non-null     object
 3   Gold          91 non-null     int64 
 4   Silver        91 non-null     int64 
 5   Bronze        91 non-null     int64 
 6   Total         91 non-null     int64 
dtypes: int64(5), object(2)
memory usage: 5.1+ KB


## 1.1 Data Ingestion

In [13]:
#Load the data based on file type
class DataIngestion:
    def __init__(self, file_path, file_type):
        self.file_path = file_path
        self.file_type = file_type.lower()
        self.df = self.load_data()

    def load_data(self):
        try:
            if self.file_type == 'csv':
                df = pd.read_csv(self.file_path)
            elif self.file_type == 'json':
                df = pd.read_json(self.file_path)
            elif self.file_type == 'excel':
                df = pd.read_excel(self.file_path)
            else:
                raise ValueError(f"Unsupported file type: {self.file_type}")
            return df
        except Exception as e:
            print(f"Error loading data: {e}")
           

## 1.2 Data Cleaning

In [None]:
class DataProcessor:
    def __init__(self, df):
        self.df = df
   
    def clean_data(self):

        # Remove duplicates
        self.df.drop_duplicates(inplace=True)
        
        # Handle missing values
        self.df.fillna(self.df.mean(), inplace=True) #impute with mean 

        # Handle outliers
        numeric_cols = self.df.select_dtypes(include=[np.number]).columns
        for col in numeric_cols:
            self.df = self.df[(np.abs(stats.zscore(self.df[col])) < 3)] #Zscore method used
        self.df.reset_index(drop=True, inplace=True)  # Reset index after filtering
        
        # Validate correct data types
        self.validate_data_types()

        return self.df

    def validate_data_types(self):
        correct_dtypes = {
            'Rank': 'int',
            'Country': 'str',
            'Country Code': 'str',
            'Gold': 'int',
            'Silver': 'int',
            'Bronze': 'int',
            'Total': 'int'
        }

        for column, dtype in correct_dtypes.items():
            if column in self.df.columns:
                if dtype == 'int':
                    self.df[column] = pd.to_numeric(self.df[column], errors='coerce').fillna(0).astype(int)
                elif dtype == 'str':
                    self.df[column] = self.df[column].astype(str)
                else:
                    print(f"Unknown type for column: {column}")
            else:
                print(f"Column {column} not found in DataFrame")
        
   


## 1.3 Data Preprocessing

In [None]:
    def preprocess_data(self):
        
        # Convert categorical columns to numerical
        categorical_cols = self.df.select_dtypes(include=[object]).columns
        for col in categorical_cols:
            le = LabelEncoder()
            self.df[col] = le.fit_transform(self.df[col])

        # Standardize numerical columns
        numeric_cols = self.df.select_dtypes(include=[np.number]).columns
        scaler = StandardScaler()
        self.df[numeric_cols] = scaler.fit_transform(self.df[numeric_cols])

        return self.df

## 2.1 Analysis Engine

In [9]:
class AnalysisEngine:
    def __init__(self, df):
        self.df = df
        
    #Summary Statistics
    def generate_statistics(self):
        return self.df.describe()
        
    #Histogram plot of Medals
    def plot_medal_counts_distribution(self):
    self.df[['Gold', 'Silver', 'Bronze', 'Total']].hist(bins=10, figsize=(10, 7))
    plt.suptitle('Distribution of Medal Counts')
    plt.xlabel('Number of Medals')
    plt.ylabel('Frequency')
    plt.savefig('medal_counts_distribution.png')
    plt.close()

    
    # Plot Total Medals by Country
    def plot_total_medals_by_country(self):
        sns.barplot(x='Country', y='Total', data=self.df)
        plt.xticks(rotation=45)
        plt.title('Total Medals by Country')
        plt.xlabel('Country')
        plt.ylabel('Total Medals')
        plt.savefig('total_medals_by_country.png')
        plt.close()

    # Plot Medal Breakdown
    def plot_medal_breakdown(self):
        self.df.set_index('Country')[['Gold', 'Silver', 'Bronze']].plot(kind='bar', stacked=True, figsize=(10, 7))
        plt.title('Medal Breakdown by Country')
        plt.xlabel('Country')
        plt.ylabel('Number of Medals')
        plt.xticks(rotation=45)
        plt.savefig('medal_breakdown.png')
        plt.close()

    # Plot Correlation Matrix
    def plot_correlation_matrix(self):
        correlation_matrix = self.df[['Gold', 'Silver', 'Bronze', 'Total']].corr()
        sns.heatmap(correlation_matrix, annot=True, cmap='coolwarm')
        plt.title('Correlation Matrix')
        plt.savefig('correlation_matrix.png')
        plt.close()

    # Plot Rank vs. Total Medals
    def plot_rank_vs_total_medals(self):
        sns.scatterplot(x='Rank', y='Total', data=self.df)
        plt.title('Rank vs. Total Medals')
        plt.xlabel('Rank')
        plt.ylabel('Total Medals')
        plt.savefig('rank_vs_total_medals.png')
        plt.close()


## 2.2 The Models

In [None]:
   # Linear Regression
    def regression(self):
        X = self.df[['Gold', 'Silver', 'Bronze']]
        y = self.df['Total']
        
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
        
        model = LinearRegression()
        model.fit(X_train, y_train)
        
        y_pred = model.predict(X_test)
        mse = mean_squared_error(y_test, y_pred)
        r2 = r2_score(y_test, y_pred)
        
        return model.coef_, model.intercept_, mse, r2

    # K-Means Clustering
    def clustering(self, n_clusters=3):
        X = self.df[['Gold', 'Silver', 'Bronze']]
        
        kmeans = KMeans(n_clusters=n_clusters, random_state=42)
        self.df['Cluster'] = kmeans.fit_predict(X)
        
        silhouette_avg = silhouette_score(X, self.df['Cluster'])
        
        return self.df[['Country', 'Cluster']], silhouette_avg

    # Decision Tree Regression
    def decision_tree_regression(self):
        X = self.df[['Gold', 'Silver', 'Bronze']]
        y = self.df['Total']
        
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    
        model = DecisionTreeRegressor(random_state=42)
        model.fit(X_train, y_train)
       
        y_pred = model.predict(X_test)
        mse = mean_squared_error(y_test, y_pred)
        r2 = r2_score(y_test, y_pred)
        
        return model, mse, r2

## Report Generation

In [None]:
def generate_report(self):
        pdf_pages = pdf_backend.PdfPages("analysis_report.pdf")
        
        stats = self.generate_statistics()
        with BytesIO() as buffer:
            stats.plot(kind='bar', figsize=(10, 7))
            plt.title('Summary Statistics')
            plt.xlabel('Features')
            plt.ylabel('Values')
            plt.savefig(buffer, format='pdf')
            pdf_pages.savefig(buffer, bbox_inches='tight')
            plt.close()
        
        # Add Plots
        plot_methods = [
            self.plot_medal_counts_distribution,
            self.plot_total_medals_by_country,
            self.plot_medal_breakdown,
            self.plot_correlation_matrix,
            self.plot_rank_vs_total_medals
        ]
        
        for plot_method in plot_methods:
            plot_method()
            with BytesIO() as buffer:
                plt.imshow(plt.imread(plot_method.__self__.__class__.__name__ + '.png'))
                plt.title(plot_method.__self__.__class__.__name__)
                plt.savefig(buffer, format='pdf')
                pdf_pages.savefig(buffer, bbox_inches='tight')
                plt.close()
        
        # Add Regression Results
        coef, intercept, mse, r2 = self.regression()
        with BytesIO() as buffer:
            plt.figure(figsize=(10, 7))
            plt.bar(range(len(coef)), coef)
            plt.title('Linear Regression Coefficients')
            plt.xlabel('Medal Type')
            plt.ylabel('Coefficient')
            plt.savefig(buffer, format='pdf')
            pdf_pages.savefig(buffer, bbox_inches='tight')
            plt.close()
        
            with open("regression_results.txt", "w") as f:
                f.write(f"Linear Regression Coefficients: {coef}\n")
                f.write(f"Intercept: {intercept}\n")
                f.write(f"Mean Squared Error: {mse}\n")
                f.write(f"R-squared: {r2}\n")
        
        # Add Clustering Results
        clusters, silhouette_avg = self.clustering()
        with BytesIO() as buffer:
            sns.scatterplot(x='Gold', y='Silver', hue='Cluster', data=self.df)
            plt.title(f'Clusters (Silhouette Score: {silhouette_avg:.2f})')
            plt.savefig(buffer, format='pdf')
            pdf_pages.savefig(buffer, bbox_inches='tight')
            plt.close()
        
        # Add Decision Tree Regression Results
        dt_model, dt_mse, dt_r2 = self.decision_tree_regression()
        with open("decision_tree_results.txt", "w") as f:
            f.write(f"Decision Tree MSE: {dt_mse}\n")
            f.write(f"Decision Tree R-squared: {dt_r2}\n")
        
        # Close PDF
        pdf_pages.close()
        
        print("Report generated successfully!")

## CLI

In [None]:
def run_cli(engine):
    print("Welcome to the Data Analysis CLI!")
    print("Available commands:")
    print("1. statistics - Generate summary statistics")
    print("2. plot_distribution - Plot distribution of medal counts")
    print("3. plot_medals_by_country - Plot total medals by country")
    print("4. plot_medal_breakdown - Plot medal breakdown")
    print("5. plot_correlation_matrix - Plot correlation matrix")
    print("6. plot_rank_vs_total_medals - Plot rank vs. total medals")
    print("7. regression - Perform linear regression")
    print("8. clustering - Perform K-Means clustering")
    print("9. decision_tree - Perform Decision Tree regression")
    print("10. report - Generate comprehensive report")
    print("11. exit - Exit the program")

    while True:
        command = input("\nEnter command: ").strip().lower()

        if command == "statistics":
            print(engine.generate_statistics())
        elif command == "plot_distribution":
            engine.plot_medal_counts_distribution()
            print("Plot saved as 'medal_counts_distribution.png'")
        elif command == "plot_medals_by_country":
            engine.plot_total_medals_by_country()
            print("Plot saved as 'total_medals_by_country.png'")
        elif command == "plot_medal_breakdown":
            engine.plot_medal_breakdown()
            print("Plot saved as 'medal_breakdown.png'")
        elif command == "plot_correlation_matrix":
            engine.plot_correlation_matrix()
            print("Plot saved as 'correlation_matrix.png'")
        elif command == "plot_rank_vs_total_medals":
            engine.plot_rank_vs_total_medals()
            print("Plot saved as 'rank_vs_total_medals.png'")
        elif command == "regression":
            coef, intercept, mse, r2 = engine.regression()
            print(f"Coefficients: {coef}")
            print(f"Intercept: {intercept}")
            print(f"Mean Squared Error: {mse}")
            print(f"R-squared: {r2}")
        elif command == "clustering":
            clusters, silhouette_avg = engine.clustering()
            print(clusters)
            print(f"Silhouette Score: {silhouette_avg}")
        elif command == "decision_tree":
            model, mse, r2 = engine.decision_tree_regression()
            print(f"Decision Tree MSE: {mse}")
            print(f"Decision Tree R-squared: {r2}")
        elif command == "report":
            engine.generate_report()
        elif command == "exit":
            print("Exiting the program.")
            break
        else:
            print("Unknown command. Please try again.")

## Doc and Testing

In [None]:
class TestDataProcessor(unittest.TestCase):
    def setUp(self):
        self.processor = DataProcessor(df) 
        self.df = pd.DataFrame({
            'Rank': [1, 2, 3],
            'Country': ['United States', 'China', 'Japan'],
            'Country Code': ['US', 'CHN', 'JPN'],
            'Gold': [10, 20, 30],
            'Silver': [15, 25, 35],
            'Bronze': [20, 30, 40],
            'Total': [45, 75, 105]
        })


    def test_regression(self):
        coef, intercept, mse, r2 = self.engine.regression()
        self.assertEqual(len(coef), 3)  # Ensure we have 3 coefficients
        self.assertTrue(isinstance(intercept, float))
        self.assertTrue(isinstance(mse, float))
        self.assertTrue(isinstance(r2, float))

    def test_clustering(self):
        clusters, silhouette_avg = self.engine.clustering(n_clusters=3)
        self.assertEqual(clusters.shape[1], 2)  # Check if 'Cluster' column is added
        self.assertTrue(0 <= silhouette_avg <= 1)  # Silhouette score should be between 0 and 1

    def test_decision_tree_regression(self):
        model, mse, r2 = self.engine.decision_tree_regression()
        self.assertTrue(hasattr(model, 'predict'))
        self.assertTrue(isinstance(mse, float))
        self.assertTrue(isinstance(r2, float))

if __name__ == '__main__':
    unittest.main()
