https://nycopendata.socrata.com/Social-Services/311-Service-Requests-from-2010-to-Present/erm2-nwe9/about_data

https://docs.python.org/3/library/sqlite3.html

In [79]:
import sqlite3
import dask.dataframe as dd
import mysql.connector as mysqlc
import pandas as pd

Plain csv

Tried to read the csv with pandas, but received MemoryError - thus using dask.

Dask uses parallel computing, operates on data in chunks.

In [122]:
cols_to_use = ["Agency", "Complaint Type", "Borough"]
cols_dtypes = {"Agency": "object", "Complaint Type": "object", "Borough": "object"}
data = dd.read_csv("../../311sr.csv", usecols=cols_to_use, dtype=cols_dtypes)

In [37]:
data.isna().sum().compute()

Agency                0
Complaint Type        0
Borough           47074
dtype: int64

In [41]:
data["Borough"].unique().compute()

0           QUEENS
1      Unspecified
2            BRONX
3         BROOKLYN
4        MANHATTAN
5    STATEN ISLAND
6              NaN
Name: Borough, dtype: object

In [42]:
data["Borough"] = data["Borough"].fillna("Unspecified")

In [43]:
data.isna().sum().compute()

Agency            0
Complaint Type    0
Borough           0
dtype: int64

In [44]:
coltypes = dict(data.dtypes)
coltypes

{'Agency': dtype('O'), 'Complaint Type': dtype('O'), 'Borough': dtype('O')}

In [45]:
%%time
most_common_complaint = data["Complaint Type"].value_counts().compute().idxmax()

CPU times: total: 3min 8s
Wall time: 1min 6s


In [46]:
print(most_common_complaint)

Noise - Residential


In [47]:
%%time
complaint_counts_by_borough = data.groupby("Borough")["Complaint Type"].value_counts().compute()
most_common_complaints = complaint_counts_by_borough.groupby("Borough").idxmax()

CPU times: total: 3min 4s
Wall time: 1min 4s


In [48]:
print(most_common_complaints)

Borough
BRONX                 (BRONX, Noise - Residential)
BROOKLYN           (BROOKLYN, Noise - Residential)
MANHATTAN         (MANHATTAN, Noise - Residential)
QUEENS                   (QUEENS, Illegal Parking)
STATEN ISLAND    (STATEN ISLAND, Street Condition)
Unspecified                 (Unspecified, HEATING)
Name: count, dtype: object


In [49]:
%%time
complaints_list = ["Noise - Residential", "Illegal Parking", "Street Condition", "HEATING"]
agencies_with_complaints = data[data["Complaint Type"].isin(complaints_list)]
agencies_with_complaints_count = agencies_with_complaints.groupby(["Agency", "Complaint Type"]).size().compute()

CPU times: total: 2min 57s
Wall time: 1min 1s


In [50]:
print(agencies_with_complaints_count)

Agency  Complaint Type     
DOT     Street Condition       1246723
HPD     HEATING                 887869
NYPD    Illegal Parking        2399699
        Noise - Residential    3303371
3-1-1   Illegal Parking              2
NYPD    Street Condition             6
DOITT   Noise - Residential          2
dtype: int64


SQLite

In [91]:
cols_to_use = ["Agency", "Complaint Type", "Borough"]
cols_dtypes = {"Agency": "object", "Complaint Type": "object", "Borough": "object"}
data = dd.read_csv("../../311sr.csv", usecols=cols_to_use, dtype=cols_dtypes)
data = data.rename(columns={'Complaint Type': 'ComplaintType'})
data["Borough"] = data["Borough"].fillna("Unspecified")

In [92]:
script = "CREATE TABLE mytable(Agency TEXT, ComplaintType TEXT, Borough TEXT)"
script2 = "INSERT INTO mytable VALUES(?, ?, ?)"

In [93]:
%%time
records = data.to_records(index=False).compute()
records = list(records) # create a list of tuples representing rows
con = sqlite3.connect("task2.db") # create a connection to database
cur = con.cursor() # create database cursor
cur.execute(script) # create table

CPU times: total: 1min 33s
Wall time: 1min 55s


<sqlite3.Cursor at 0x1fbcc5513c0>

In [94]:
res = cur.execute("SELECT name FROM sqlite_master") # check if the table is created
res.fetchone()

('mytable',)

In [95]:
for row in cur.execute("PRAGMA table_info(mytable)"): # check column types
    print(row)

(0, 'Agency', 'TEXT', 0, None, 0)
(1, 'ComplaintType', 'TEXT', 0, None, 0)
(2, 'Borough', 'TEXT', 0, None, 0)


In [96]:
records[:4] # check if records look correct - index included even though index=False 

[(0, 'NYPD', 'Noise - Residential', 'MANHATTAN'),
 (1, 'OSE', 'NonCompliance with Phased Reopening', 'BROOKLYN'),
 (2, 'DSNY', 'Overflowing Litter Baskets', 'BROOKLYN'),
 (3, 'NYPD', 'Noise - Residential', 'BROOKLYN')]

In [97]:
type(records[0])

numpy.record

In [98]:
%%time
records = [tuple(records[i])[1:4] for i in range(len(records))] # converting numpy.record to tuples
cur.executemany(script2, records) # populate table
con.commit()

CPU times: total: 2min 18s
Wall time: 2min 32s


In [102]:
check = cur.execute("SELECT Agency, Borough, ComplaintType FROM mytable LIMIT 5") # check if table is populated
check.fetchall()

[('NYPD', 'MANHATTAN', 'Noise - Residential'),
 ('OSE', 'BROOKLYN', 'NonCompliance with Phased Reopening'),
 ('DSNY', 'BROOKLYN', 'Overflowing Litter Baskets'),
 ('NYPD', 'BROOKLYN', 'Noise - Residential'),
 ('NYPD', 'BROOKLYN', 'Noise - Street/Sidewalk')]

In [100]:
%%time
most_common_complaint = cur.execute("""
    SELECT ComplaintType, COUNT(ComplaintType) AS Count
    FROM mytable
    GROUP BY ComplaintType
    ORDER BY Count DESC
    LIMIT 1;
""")

most_common_complaint.fetchone()

CPU times: total: 23.3 s
Wall time: 26 s


('Noise - Residential', 3304053)

In [103]:
%%time
most_common_complaints = cur.execute("""
    WITH RankedComplaints AS (
    SELECT
        Borough,
        ComplaintType,
        ROW_NUMBER() OVER(PARTITION BY Borough ORDER BY COUNT(*) DESC) as Rank
    FROM mytable
    GROUP BY Borough, ComplaintType
    )
    SELECT Borough, ComplaintType
    FROM RankedComplaints
    WHERE Rank = 1;
""")

most_common_complaints.fetchall()

CPU times: total: 44.3 s
Wall time: 52.9 s


[('BRONX', 'Noise - Residential'),
 ('BROOKLYN', 'Noise - Residential'),
 ('MANHATTAN', 'Noise - Residential'),
 ('QUEENS', 'Illegal Parking'),
 ('STATEN ISLAND', 'Street Condition'),
 ('Unspecified', 'HEATING')]

In [104]:
%%time
agencies_with_complaints_count = cur.execute("""
    SELECT Agency, ComplaintType, COUNT(*) AS Count
    FROM mytable
    WHERE ComplaintType IN ('Noise - Residential', 'Illegal Parking', 'Street Condition', 'HEATING')
    GROUP BY Agency, ComplaintType
    ORDER BY Agency, ComplaintType;
""")

agencies_with_complaints_count.fetchall()

CPU times: total: 10.8 s
Wall time: 12.1 s


[('3-1-1', 'Illegal Parking', 2),
 ('DOITT', 'Noise - Residential', 2),
 ('DOT', 'Street Condition', 1246403),
 ('HPD', 'HEATING', 887869),
 ('NYC311-PRD', 'Street Condition', 691),
 ('NYPD', 'Illegal Parking', 2401183),
 ('NYPD', 'Noise - Residential', 3304051),
 ('NYPD', 'Street Condition', 6)]

In [None]:
con.close()

MySQL

In [5]:
cols_to_use = ["Agency", "Complaint Type", "Borough"]
cols_dtypes = {"Agency": "object", "Complaint Type": "object", "Borough": "object"}
data = dd.read_csv("../../311sr.csv", usecols=cols_to_use, dtype=cols_dtypes)
data = data.rename(columns={'Complaint Type': 'ComplaintType'})
data["Borough"] = data["Borough"].fillna("Unspecified")

In [12]:
script = "CREATE TABLE mytable(Agency TEXT, ComplaintType TEXT, Borough TEXT)"
script2 = "INSERT INTO mytable (Agency, ComplaintType, Borough) VALUES(%s, %s, %s)"

In [7]:
%%time
records = data.to_records(index=False).compute()
records = list(records) # create a list of tuples representing rows
mydb = mysqlc.connect(host="localhost", user="root", password="1234", database="bigdata")
mycursor = mydb.cursor()
mycursor.execute(script)

CPU times: total: 1min 6s
Wall time: 1min 49s


In [19]:
mycursor.fetchall()

[('mytable',)]

In [29]:
query = """
SELECT COLUMN_NAME, DATA_TYPE
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = 'bigdata' AND TABLE_NAME = 'mytable';
"""
mycursor.execute(query)

In [30]:
mycursor.fetchall()

[('Agency', 'text'), ('ComplaintType', 'text'), ('Borough', 'text')]

In [36]:
%%time
records = [tuple(records[i])[1:4] for i in range(len(records))] # converting numpy.record to tuples

CPU times: total: 1min 45s
Wall time: 1min 56s


In [72]:
%%time
for i in range(0, len(records), 10000): # inserting in batches, otherwise connection is lost
    batch = records[i:i+10000]
    mycursor.executemany(script2, batch) # populate table
    mydb.commit()

CPU times: total: 1min 54s
Wall time: 9min 23s


In [73]:
mycursor.execute("SELECT Agency, Borough, ComplaintType FROM mytable LIMIT 5")
mycursor.fetchall()

[('NYPD', 'MANHATTAN', 'Noise - Residential'),
 ('OSE', 'BROOKLYN', 'NonCompliance with Phased Reopening'),
 ('DSNY', 'BROOKLYN', 'Overflowing Litter Baskets'),
 ('NYPD', 'BROOKLYN', 'Noise - Residential'),
 ('NYPD', 'BROOKLYN', 'Noise - Street/Sidewalk')]

In [78]:
%%time
mycursor.execute("""
    SELECT ComplaintType, COUNT(ComplaintType) AS Count
    FROM mytable
    GROUP BY ComplaintType
    ORDER BY Count DESC
    LIMIT 1;
""")

mycursor.fetchall()

CPU times: total: 0 ns
Wall time: 3min 9s


[('Noise - Residential', 3304053)]

In [82]:
%%time
mycursor.execute("""
    WITH RankedComplaints AS (
    SELECT Borough, ComplaintType, ROW_NUMBER() OVER(PARTITION BY Borough ORDER BY COUNT(*) DESC) as R
    FROM mytable
    GROUP BY Borough, ComplaintType
    )
    SELECT Borough, ComplaintType
    FROM RankedComplaints
    WHERE R = 1;
""")

mycursor.fetchall()

CPU times: total: 0 ns
Wall time: 3min 44s


[('BRONX', 'Noise - Residential'),
 ('BROOKLYN', 'Noise - Residential'),
 ('MANHATTAN', 'Noise - Residential'),
 ('QUEENS', 'Illegal Parking'),
 ('STATEN ISLAND', 'Street Condition'),
 ('Unspecified', 'HEATING')]

In [83]:
%%time
mycursor.execute("""
    SELECT Agency, ComplaintType, COUNT(*) AS C
    FROM mytable
    WHERE ComplaintType IN ('Noise - Residential', 'Illegal Parking', 'Street Condition', 'HEATING')
    GROUP BY Agency, ComplaintType
    ORDER BY Agency, ComplaintType;
""")

mycursor.fetchall()

CPU times: total: 0 ns
Wall time: 45.8 s


[('3-1-1', 'Illegal Parking', 2),
 ('DOITT', 'Noise - Residential', 2),
 ('DOT', 'Street Condition', 1246403),
 ('HPD', 'HEATING', 887869),
 ('NYC311-PRD', 'Street Condition', 691),
 ('NYPD', 'Illegal Parking', 2401183),
 ('NYPD', 'Noise - Residential', 3304051),
 ('NYPD', 'Street Condition', 6)]

Results

In [107]:
query_time = pd.read_csv("queries.csv", delimiter=";")
query_time

Unnamed: 0,time,query,csv,sqlite,mysql
0,cpu,1,188,23.3,0.0
1,cpu,2,184,44.3,0.0
2,cpu,3,177,10.8,0.0
3,wall,1,66,26.0,189.0
4,wall,2,64,52.9,224.0
5,wall,3,61,12.1,45.8


In [106]:
print(f"CPU times total (sqlite): {93+138+23.3+44.3+10.8} s, Wall times total (sqlite): {115+152+26+52.9+12.1} s")

CPU times total (sqlite): 309.40000000000003 s, Wall times total (sqlite): 358.0 s


In [89]:
print(f"CPU times total (mysql): {66+105+114} s, Wall times total (mysql): {45.8+224+189+109+116+563} s")

CPU times total (mysql): 285 s, Wall times total (mysql): 1246.8 s


Improving SQL queries efficiency by indexing

SQLite

In [108]:
con = sqlite3.connect("task2.db")
cur = con.cursor()

In [109]:
cur.execute("CREATE INDEX idx_complaint_type ON mytable(ComplaintType);")

<sqlite3.Cursor at 0x1fbcc550840>

In [111]:
cur.execute("CREATE INDEX idx_agency ON mytable(Agency);")

<sqlite3.Cursor at 0x1fbcc550840>

In [113]:
cur.execute("CREATE INDEX idx_borough ON mytable(Borough);")

<sqlite3.Cursor at 0x1fbcc550840>

In [115]:
cur.execute("PRAGMA index_list(mytable)")
cur.fetchall()

[(0, 'idx_borough', 0, 'c', 0),
 (1, 'idx_agency', 0, 'c', 0),
 (2, 'idx_complaint_type', 0, 'c', 0)]

In [116]:
%%time
most_common_complaint = cur.execute("""
    SELECT ComplaintType, COUNT(ComplaintType) AS Count
    FROM mytable
    GROUP BY ComplaintType
    ORDER BY Count DESC
    LIMIT 1;
""")

most_common_complaint.fetchall()

CPU times: total: 4.38 s
Wall time: 5.82 s


[('Noise - Residential', 3304053)]

In [117]:
%%time
most_common_complaints = cur.execute("""
    WITH RankedComplaints AS (
    SELECT
        Borough,
        ComplaintType,
        ROW_NUMBER() OVER(PARTITION BY Borough ORDER BY COUNT(*) DESC) as Rank
    FROM mytable
    GROUP BY Borough, ComplaintType
    )
    SELECT Borough, ComplaintType
    FROM RankedComplaints
    WHERE Rank = 1;
""")

most_common_complaints.fetchall()

CPU times: total: 48.2 s
Wall time: 1min 14s


[('BRONX', 'Noise - Residential'),
 ('BROOKLYN', 'Noise - Residential'),
 ('MANHATTAN', 'Noise - Residential'),
 ('QUEENS', 'Illegal Parking'),
 ('STATEN ISLAND', 'Street Condition'),
 ('Unspecified', 'HEATING')]

In [118]:
%%time
agencies_with_complaints_count = cur.execute("""
    SELECT Agency, ComplaintType, COUNT(*) AS Count
    FROM mytable
    WHERE ComplaintType IN ('Noise - Residential', 'Illegal Parking', 'Street Condition', 'HEATING')
    GROUP BY Agency, ComplaintType
    ORDER BY Agency, ComplaintType;
""")

agencies_with_complaints_count.fetchall()

CPU times: total: 7.88 s
Wall time: 15.4 s


[('3-1-1', 'Illegal Parking', 2),
 ('DOITT', 'Noise - Residential', 2),
 ('DOT', 'Street Condition', 1246403),
 ('HPD', 'HEATING', 887869),
 ('NYC311-PRD', 'Street Condition', 691),
 ('NYPD', 'Illegal Parking', 2401183),
 ('NYPD', 'Noise - Residential', 3304051),
 ('NYPD', 'Street Condition', 6)]

In [119]:
con.close()

MySQL

In [120]:
mydb = mysqlc.connect(host="localhost", user="root", password="1234", database="bigdata")
mycursor = mydb.cursor()

In [142]:
mycursor.execute("CREATE INDEX idx_complaint_type ON mytable(ComplaintType(15));")

In [144]:
mycursor.execute("CREATE INDEX idx_agency ON mytable(Agency(15));")

In [145]:
mycursor.execute("CREATE INDEX idx_borough ON mytable(Borough(5));")

In [148]:
mycursor.execute("SHOW INDEX FROM mytable;")
indexes = mycursor.fetchall()
for index in indexes:
    print(index)

('mytable', 1, 'idx_complaint_type', 1, 'ComplaintType', 'A', 37674, 15, None, 'YES', 'BTREE', '', '', 'YES', None)
('mytable', 1, 'idx_agency', 1, 'Agency', 'A', 7628, 15, None, 'YES', 'BTREE', '', '', 'YES', None)
('mytable', 1, 'idx_borough', 1, 'Borough', 'A', 3572, 5, None, 'YES', 'BTREE', '', '', 'YES', None)


In [149]:
%%time
mycursor.execute("""
    SELECT ComplaintType, COUNT(ComplaintType) AS Count
    FROM mytable
    GROUP BY ComplaintType
    ORDER BY Count DESC
    LIMIT 1;
""")

mycursor.fetchall()

CPU times: total: 0 ns
Wall time: 3min 4s


[('Noise - Residential', 3304053)]

In [150]:
%%time
mycursor.execute("""
    WITH RankedComplaints AS (
    SELECT Borough, ComplaintType, ROW_NUMBER() OVER(PARTITION BY Borough ORDER BY COUNT(*) DESC) as R
    FROM mytable
    GROUP BY Borough, ComplaintType
    )
    SELECT Borough, ComplaintType
    FROM RankedComplaints
    WHERE R = 1;
""")

mycursor.fetchall()

CPU times: total: 0 ns
Wall time: 3min 42s


[('BRONX', 'Noise - Residential'),
 ('BROOKLYN', 'Noise - Residential'),
 ('MANHATTAN', 'Noise - Residential'),
 ('QUEENS', 'Illegal Parking'),
 ('STATEN ISLAND', 'Street Condition'),
 ('Unspecified', 'HEATING')]

In [151]:
%%time
mycursor.execute("""
    SELECT Agency, ComplaintType, COUNT(*) AS C
    FROM mytable
    WHERE ComplaintType IN ('Noise - Residential', 'Illegal Parking', 'Street Condition', 'HEATING')
    GROUP BY Agency, ComplaintType
    ORDER BY Agency, ComplaintType;
""")

mycursor.fetchall()

CPU times: total: 0 ns
Wall time: 40.9 s


[('3-1-1', 'Illegal Parking', 2),
 ('DOITT', 'Noise - Residential', 2),
 ('DOT', 'Street Condition', 1246403),
 ('HPD', 'HEATING', 887869),
 ('NYC311-PRD', 'Street Condition', 691),
 ('NYPD', 'Illegal Parking', 2401183),
 ('NYPD', 'Noise - Residential', 3304051),
 ('NYPD', 'Street Condition', 6)]

Results

In [152]:
query_time = pd.read_csv("queries2.csv", delimiter=";")
query_time

Unnamed: 0,time,query,csv,sqlite,mysql,sqlite_idx,mysql_idx
0,cpu,1,188,23.3,0.0,4.38,0.0
1,cpu,2,184,44.3,0.0,48.2,0.0
2,cpu,3,177,10.8,0.0,7.88,0.0
3,wall,1,66,26.0,189.0,5.82,184.0
4,wall,2,64,52.9,224.0,74.0,222.0
5,wall,3,61,12.1,45.8,15.4,40.9


In [153]:
mydb.close()