# Codtech — Data Analyst Internship

**Task 1: Big data analysis**

This notebook demonstrates scalable analysis using **Dask** (with a pandas fallback if Dask is unavailable). We use a synthetic dataset (~10,000 rows) that simulates user events (click/view/purchase/etc.). The notebook includes data loading, cleaning, aggregation, and some simple visualizations and insights.

Files generated with this notebook:

- `codtech_bigdata_sample_10000.csv` — dataset (~10k rows)
- `codtech_bigdata_analysis_notebook.ipynb` — this notebook

---

In [None]:
# Try to import Dask. If unavailable, fall back to pandas and note limitations.
try:
    import dask.dataframe as dd
    dask_available = True
    print('Dask is available — will use Dask for scalable operations.')
except Exception as e:
    dask_available = False
    print('Dask not available — falling back to pandas. To run with Dask, install it in your environment (pip install dask[complete]).')

import pandas as pd
import numpy as np
print('pandas version:', pd.__version__)

In [None]:
# Load the dataset (using Dask if available)
csv_path = r"/mnt/data/codtech_data_analytics/codtech_bigdata_sample_10000.csv"
if dask_available:
    df = dd.read_csv(csv_path, assume_missing=True, parse_dates=['timestamp'])
    print('Loaded with Dask dataframe. Row count (lazy):', df.shape[0])
else:
    df = pd.read_csv(csv_path, parse_dates=['timestamp'])
    print('Loaded with pandas DataFrame. Rows:', len(df))

# Show a small sample
try:
    display(df.head())
except Exception:
    print(df.head().to_string())

In [None]:
# Basic cleaning & feature engineering
def prepare(df, dask_available):
    if dask_available:
        # ensure correct dtypes
        df['event_type'] = df['event_type'].astype('category')
        df['category'] = df['category'].astype('category')
        df['country'] = df['country'].astype('category')
        df['value'] = df['value'].fillna(0).astype(float)
        # extract date parts
        df['date'] = df['timestamp'].dt.date
        df['hour'] = df['timestamp'].dt.hour
        return df
    else:
        df['event_type'] = df['event_type'].astype('category')
        df['category'] = df['category'].astype('category')
        df['country'] = df['country'].astype('category')
        df['value'] = df['value'].fillna(0).astype(float)
        df['date'] = df['timestamp'].dt.date
        df['hour'] = df['timestamp'].dt.hour
        return df

df = prepare(df, dask_available)
print('Prepared dataframe — showing dtypes:')
try:
    print(df.dtypes)
except Exception:
    print(df.head())

In [None]:
# Aggregations: event counts, purchases, total revenue by country & category
if dask_available:
    # event counts per day
    daily_events = df.groupby('date')['event_type'].count().compute().rename('events')
    # purchases and revenue by country
    purchases_country = df[df['event_type']=='purchase'].groupby('country').agg({'event_type':'count','value':'sum'}).compute()
    purchases_country = purchases_country.rename(columns={'event_type':'purchase_count','value':'total_revenue'}).sort_values('total_revenue', ascending=False)
else:
    daily_events = df.groupby('date')['event_type'].count().rename('events')
    purchases_country = df[df['event_type']=='purchase'].groupby('country').agg(purchase_count=('event_type','count'), total_revenue=('value','sum')).sort_values('total_revenue', ascending=False)

print('Top 5 days by event count:')
display(daily_events.sort_values(ascending=False).head())
print('\nPurchases & revenue by country:')
display(purchases_country.head())

In [None]:
# Category-level metrics: views -> purchases conversion estimate
if dask_available:
    cat_counts = df.groupby(['category','event_type']).size().compute().unstack(fill_value=0)
else:
    cat_counts = df.groupby(['category','event_type']).size().unstack(fill_value=0)

# Ensure columns exist
for col in ['view','purchase','click','signup','refund']:
    if col not in cat_counts.columns:
        cat_counts[col] = 0

cat_counts['purchase_rate_per_view'] = cat_counts['purchase'] / (cat_counts['view'].replace(0, np.nan))
cat_counts = cat_counts.sort_values('purchase_rate_per_view', ascending=False)
display(cat_counts[['view','purchase','purchase_rate_per_view']])

In [None]:
# Simple visualization: top countries by revenue (matplotlib)
import matplotlib.pyplot as plt
import numpy as np

try:
    if dask_available:
        top = purchases_country.compute() if hasattr(purchases_country, 'compute') else purchases_country
    else:
        top = purchases_country.copy()
    top = top.sort_values('total_revenue', ascending=False).head(10)
    ax = top['total_revenue'].plot(kind='bar', legend=False)
    ax.set_title('Top countries by total revenue')
    ax.set_ylabel('Total revenue')
    plt.tight_layout()
    plt.show()
except Exception as e:
    print('Plot failed:', e)

## Quick insights (automatically derived)

- The dataset simulates user events with a higher proportion of non-purchase events (clicks/views).
- Revenue is concentrated in countries with higher simulated purchase counts. Check the `purchases_country` table for exact figures.
- Category-level purchase rates (`purchase_rate_per_view`) highlight which categories convert best from views to purchases.

---

### Notes

- This notebook tries to use **Dask** for scalable operations. If your environment doesn't have Dask installed, the notebook will fall back to pandas (which works for ~10k rows but is not distributed). To scale to truly "big data" (millions of rows), run this notebook in an environment with Dask or PySpark installed and adequate compute resources.



In [None]:
# Save a quick summary CSVs for the main outputs
out_dir = r"/mnt/data/codtech_data_analytics"
if dask_available:
    daily_events.to_csv(out_dir + '/daily_events_summary.csv', index=True)
    purchases_country.to_csv(out_dir + '/purchases_by_country_summary.csv')
else:
    daily_events.to_csv(out_dir + '/daily_events_summary.csv', index=True)
    purchases_country.to_csv(out_dir + '/purchases_by_country_summary.csv')

print('Saved summary CSVs to', out_dir)