In [35]:
import numpy as np
import pandas as pd
import time
from collections import Counter
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm

sns.set_style("darkgrid")

file_path = '311-service-requests-from-2010-to-present.csv'

In [178]:
def get_chunk_data(chunk_size=10000, nrows=None):
    chunk_data = pd.read_csv(
        file_path,
        chunksize=chunk_size,
        usecols=["Unique Key", "Borough", "Agency", "Complaint Type"],
        nrows=nrows,
        na_values=None)
    return chunk_data

# Task 1

In [192]:
%%time
chunk_data = get_chunk_data()

CPU times: user 2.62 ms, sys: 199 µs, total: 2.82 ms
Wall time: 2.33 ms


In [114]:
%%time
complaint_counter = Counter()
borough_complaint_counter = {}
agency_complaint_counter = {}
start_time = time.time()

for chunk in tqdm(chunk_data):
    complaint_counter.update(chunk['Complaint Type'])

    for borough, group in chunk.groupby('Borough'):
        if borough not in borough_complaint_counter:
            borough_complaint_counter[borough] = Counter()
        borough_complaint_counter[borough].update(group['Complaint Type'])

    for agency, group in chunk.groupby('Agency'):
        if agency not in agency_complaint_counter:
            agency_complaint_counter[agency] = Counter()
        agency_complaint_counter[agency].update(group['Complaint Type'])
        
end_time = time.time()
total_csv_time = end_time - start_time

most_common_complaints = complaint_counter.most_common(5)

most_common_complaints_borough = {borough: counter.most_common(1)[0] for borough, counter in borough_complaint_counter.items()}

most_common_complaints_agency = {agency: counter.most_common(1)[0] for agency, counter in agency_complaint_counter.items()}

2196it [03:47,  9.64it/s]

CPU times: user 3min 1s, sys: 19.8 s, total: 3min 21s
Wall time: 3min 47s





In [115]:
print(f"Most Common Complaints:")
pd.DataFrame(most_common_complaints)

Most Common Complaints:


Unnamed: 0,0,1
0,Illegal Parking,2484920
1,Noise - Residential,2087491
2,Blocked Driveway,1650587
3,Noise - Street/Sidewalk,1481688
4,For Hire Vehicle Complaint,901169


In [116]:
print(f"Most Common Complaints in Each Borough:")
pd.DataFrame(most_common_complaints_borough).T.sort_values(by=1, ascending=False).head()

Most Common Complaints in Each Borough:


Unnamed: 0,0,1
BROOKLYN,Illegal Parking,867437
QUEENS,Illegal Parking,765734
MANHATTAN,For Hire Vehicle Complaint,570985
BRONX,Noise - Street/Sidewalk,421929
STATEN ISLAND,Illegal Parking,133694


In [117]:
print(f"Agencies with the Most Common Complaints:")
pd.DataFrame(most_common_complaints_agency).T.sort_values(by=1, ascending=False).head()

Agencies with the Most Common Complaints:


Unnamed: 0,0,1
NYPD,Illegal Parking,2484920
TLC,For Hire Vehicle Complaint,901169
DOT,Street Condition,712470
DEP,Water System,557700
DOB,General Construction/Plumbing,488075


In [118]:
print(f"Total time taken for CSV processing: {total_csv_time:.2f}s")

Total time taken for CSV processing: 227.73s


# Task 2

In [119]:
from sqlalchemy import MetaData, create_engine, Table, Column, Integer, String, text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

### SQLite

In [145]:
%%time
SQLITE_SQLALCHEMY_DATABASE_URL = "sqlite:///./sqlite.db"
# SQLITE_SQLALCHEMY_DATABASE_URL = "mysql+mysqlconnector://root:@127.0.0.1/Big_data_lsit_2"
chunk_data = get_chunk_data()

sqlite_engine = create_engine(SQLITE_SQLALCHEMY_DATABASE_URL, echo=False)
with sqlite_engine.connect() as connection:
    connection.execute(text("DROP TABLE IF EXISTS chunk_data"))
    connection.execute(text('PRAGMA journal_mode = OFF;'))
    for chunk in tqdm(chunk_data):
        connection.execute(text('BEGIN TRANSACTION;'))
        chunk.to_sql("chunk_data", sqlite_engine, if_exists="append", chunksize=1000)
        connection.execute(text('COMMIT;'))
    connection.execute(text('PRAGMA journal_mode = DELETE;'))


2196it [06:54,  5.30it/s]

CPU times: user 5min 36s, sys: 22.7 s, total: 5min 58s
Wall time: 6min 57s





In [148]:
def sql_task(engine):
    with engine.connect() as connection:
        # cursor = connection.cursor()
        sql_query = """
            SELECT
                `Complaint Type`, COUNT(*) AS ComplaintCount
            FROM
                chunk_data
            GROUP BY 
                `Complaint Type` 
            ORDER BY
                ComplaintCount
            DESC
            LIMIT 5;
            """
        sqllife_most_common_complaints = pd.read_sql_query(sql_query, connection)

        f_sql_query = """
           WITH
                ComplaintCounts AS(
                SELECT
                    {column},
                    `Complaint Type`,
                    COUNT(*) AS ComplaintCount
                FROM
                    chunk_data
                GROUP BY
                    {column},
                    `Complaint Type`
            ),
            MaxComplaintCounts AS(
                SELECT
                    {column},
                    MAX(ComplaintCount) AS MaxComplaintCount
                FROM
                    ComplaintCounts
                GROUP BY
                    {column}
                ORDER BY
                    MaxComplaintCount
                DESC
                LIMIT 5
            )

            SELECT
                c.{column},
                c.`Complaint Type`,
                c.ComplaintCount
            FROM
                ComplaintCounts AS c
            JOIN MaxComplaintCounts AS m ON
                c.{column} = m.{column} AND c.ComplaintCount = m.MaxComplaintCount
            ORDER BY
                c.ComplaintCount
            DESC;
            """

        sql_query = f_sql_query.format(column="Borough")
        sqllife_most_common_complaints_borough = pd.read_sql_query(sql_query, connection)
        
        sql_query = f_sql_query.format(column="Agency")
        sqllife_most_common_complaints_agency = pd.read_sql_query(sql_query, connection)
    return sqllife_most_common_complaints, sqllife_most_common_complaints_borough, sqllife_most_common_complaints_agency

In [149]:
%%time
sqlite_most_common_complaints, sqlite_most_common_complaints_borough, sqlite_most_common_complaints_agency = sql_task(sqlite_engine)

CPU times: user 1min 43s, sys: 11.8 s, total: 1min 55s
Wall time: 2min 22s


In [150]:
sqlite_most_common_complaints

Unnamed: 0,Complaint Type,ComplaintCount
0,Illegal Parking,2484920
1,Noise - Residential,2087491
2,Blocked Driveway,1650587
3,Noise - Street/Sidewalk,1481688
4,For Hire Vehicle Complaint,901169


In [151]:
sqlite_most_common_complaints_borough

Unnamed: 0,Borough,Complaint Type,ComplaintCount
0,BROOKLYN,Illegal Parking,867437
1,QUEENS,Illegal Parking,765734
2,MANHATTAN,For Hire Vehicle Complaint,570985
3,BRONX,Noise - Street/Sidewalk,421929
4,STATEN ISLAND,Illegal Parking,133694


In [152]:
sqlite_most_common_complaints_agency

Unnamed: 0,Agency,Complaint Type,ComplaintCount
0,NYPD,Illegal Parking,2484920
1,TLC,For Hire Vehicle Complaint,901169
2,DOT,Street Condition,712470
3,DEP,Water System,557700
4,DOB,General Construction/Plumbing,488075


### MYSQL

In [161]:

MYSQL_SQLALCHEMY_DATABASE_URL = "mysql+mysqlconnector://root:@127.0.0.1/Big_data_lsit_2"
mysql_engine = create_engine(
    MYSQL_SQLALCHEMY_DATABASE_URL, echo=False,
    pool_size=10,
    max_overflow=20,
)
chunk_data = get_chunk_data()

with mysql_engine.connect() as connection:
    connection.execute(text("DROP TABLE IF EXISTS chunk_data"))
    # connection.execute(text('PRAGMA journal_mode = OFF;'))
    for chunk in tqdm(chunk_data):
        # connection.execute(text('BEGIN TRANSACTION;'))
        chunk.to_sql("chunk_data", mysql_engine, if_exists="append", chunksize=1000)
        # connection.execute(text('COMMIT;'))
    # connection.execute(text('PRAGMA journal_mode = DELETE;'))


2196it [20:47,  1.76it/s]


In [162]:
%%time
mysql_most_common_complaints, mysql_most_common_complaints_borough, mysql_most_common_complaints_agency = sql_task(mysql_engine)

CPU times: user 36.2 ms, sys: 918 µs, total: 37.1 ms
Wall time: 8min 59s


In [165]:
mysql_most_common_complaints

Unnamed: 0,Complaint Type,ComplaintCount
0,Illegal Parking,2484920
1,Noise - Residential,2087491
2,Blocked Driveway,1650587
3,Noise - Street/Sidewalk,1481688
4,For Hire Vehicle Complaint,901169


In [166]:
mysql_most_common_complaints_borough

Unnamed: 0,Borough,Complaint Type,ComplaintCount
0,BROOKLYN,Illegal Parking,867437
1,QUEENS,Illegal Parking,765734
2,MANHATTAN,For Hire Vehicle Complaint,570985
3,BRONX,Noise - Street/Sidewalk,421929
4,STATEN ISLAND,Illegal Parking,133694


## Task 3

In [173]:
from multiprocessing import Pool


def execute_query(engine, sql_query):
    with engine.connect() as connection:
        return pd.read_sql_query(sql_query, connection)


f_sql_query = """
    WITH
        ComplaintCounts AS(
        SELECT
            {column},
            `Complaint Type`,
            COUNT(*) AS ComplaintCount
        FROM
            chunk_data
        GROUP BY
            {column},
            `Complaint Type`
    ),
    MaxComplaintCounts AS(
        SELECT
            {column},
            MAX(ComplaintCount) AS MaxComplaintCount
        FROM
            ComplaintCounts
        GROUP BY
            {column}
        ORDER BY
            MaxComplaintCount
        DESC
        LIMIT 5
    )

    SELECT
        c.{column},
        c.`Complaint Type`,
        c.ComplaintCount
    FROM
        ComplaintCounts AS c
    JOIN MaxComplaintCounts AS m ON
        c.{column} = m.{column} AND c.ComplaintCount = m.MaxComplaintCount
    ORDER BY
        c.ComplaintCount
    DESC;
    """
queries = [
    """
        SELECT
            `Complaint Type`, COUNT(*) AS ComplaintCount
        FROM
            chunk_data
        GROUP BY 
            `Complaint Type` 
        ORDER BY
            ComplaintCount
        DESC
        LIMIT 5;
        """,
    f_sql_query.format(column="Borough"),
    f_sql_query.format(column="Agency")
]


In [175]:
%%time
def sqlite_pool(query):
    return execute_query(sqlite_engine, query)

with Pool(3) as pool:
    results = list(tqdm(pool.imap(sqlite_pool, queries), total=len(queries)))

100%|██████████| 3/3 [01:17<00:00, 25.96s/it]

CPU times: user 33.4 ms, sys: 68.4 ms, total: 102 ms
Wall time: 1min 18s





In [176]:
from concurrent.futures import ThreadPoolExecutor

def execute_query(engine, sql_query):
    with engine.connect() as connection:
        return pd.read_sql_query(sql_query, connection)


def run_queries_concurrently(queries):
    def mysql_pool(query):
        return execute_query(mysql_engine, query)
    
    with ThreadPoolExecutor(max_workers=len(queries)) as executor:
        results = list(tqdm(executor.map(mysql_pool, queries), total=len(queries)))
    return results

In [177]:
%%time
results = run_queries_concurrently(queries)

0it [00:00, ?it/s]

3it [10:10, 203.52s/it]

CPU times: user 54.6 ms, sys: 4.51 ms, total: 59.1 ms
Wall time: 10min 10s



