## 2. Working with files

<img src="https://tuplex.cs.brown.edu/_static/img/logo.png" width="128px" style="float: right;" />

In the 2nd part of the Tuplex intro series, we'll take a look at how to work with CSV and text files. First, let's install Tuplex again in our notebook.

In [1]:
!python3 --version

Python 3.7.13


In [2]:
# # install Colab compatible upgrades to avoid dependency errors
# !pip install -q folium==0.2.1
# !pip install -q --upgrade urllib3==1.25.11
# !pip install flask-socketio flask-pymongo eventlet==0.30.0
# !pip uninstall jedi -y && pip3 install 'jedi>=0.10'

# # install Tuplex
# #!pip install -q -i https://test.pypi.org/simple/ --extra-index-url https://pypi.org/simple tuplex==0.3.2rc1
# #!pip install -q tuplex

# # !pip install -i https://test.pypi.org/simple/ tuplex==0.3.dev20220822143933006789
# #!pip install -i https://test.pypi.org/simple/ tuplex

In [3]:
# downloads temp tuplex file
#!gdown https://drive.google.com/uc?id=1-TxhNpVg6TW96rNvLWv_2NWUz2tdoLnN

In [4]:
#!pip3 install --force-reinstall /content/tuplex-0.3.3-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl

In [5]:
!pip3 install wheelhouse/tuplex-0.3.3-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl

Processing ./wheelhouse/tuplex-0.3.3-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
Collecting flask-pymongo
  Downloading Flask_PyMongo-2.3.0-py2.py3-none-any.whl (12 kB)
Collecting iso8601
  Using cached iso8601-1.0.2-py3-none-any.whl (9.7 kB)
Collecting Flask==2.0.2
  Downloading Flask-2.0.2-py3-none-any.whl (95 kB)
[2K     [38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m95.2/95.2 KB[0m [31m1.1 MB/s[0m eta [36m0:00:00[0m[31m1.2 MB/s[0m eta [36m0:00:01[0m
Collecting pluggy
  Downloading pluggy-1.0.0-py2.py3-none-any.whl (13 kB)
Collecting astor
  Using cached astor-0.8.1-py2.py3-none-any.whl (27 kB)
Collecting flask-socketio
  Downloading Flask_SocketIO-5.3.1-py3-none-any.whl (17 kB)
Collecting eventlet==0.30.0
  Downloading eventlet-0.30.0-py2.py3-none-any.whl (224 kB)
[2K     [38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m224.1/224.1 KB[0m [31m2.5 MB/s[0m eta [36m0:00:00[0m[31m2.9 MB/s[0m eta [36m0:00:01[0m
[?25h

### 2.1 Basic IO - Reading CSV files
To read in a csv file, Tuplex provides an API function `csv`

In [6]:
import tuplex

c = tuplex.Context({'tuplex.redirectToPythonLogging':False})

ERROR:root:Failed to start or connect to Tuplex WebUI. Details: MongoDB (mongod) not found on PATH. In order to use Tuplex's WebUI, you need MongoDB installed or point the framework to a running MongoDB instance


Welcome to

  _____            _
 |_   _|   _ _ __ | | _____  __
   | || | | | '_ \| |/ _ \ \/ /
   | || |_| | |_) | |  __/>  <
   |_| \__,_| .__/|_|\___/_/\_\ 0.3.3
            |_|
    
using Python 3.7.13 (default, Sep 12 2022, 22:16:36) 
[GCC 11.2.0] on linux
[2022-09-12 22:44:45.614] [local ee] [info] loaded runtime library from/home/leonhard/.pyenv/versions/3.7.13/lib/python3.7/site-packages/tuplex/libexec/tuplex_runtime.cpython-37m-x86_64-linux-gnu.so
[2022-09-12 22:44:45.614] [local ee] [info] initializing LLVM backend
[2022-09-12 22:44:45.615] [LLVM] [info] compiling code for skylake
[2022-09-12 22:44:45.619] [memory] [info] allocated bitmap managed memory region (1.00 GB, 32.00 MB block size)
[2022-09-12 22:44:45.619] [E/1] [info] provided cache path file:///tmp/tuplex-cache-leonhard/E1 does not exist. Attempting to create it.
[2022-09-12 22:44:45.620] [E/1] [info] created cache directory file:///tmp/tuplex-cache-leonhard/E1
[2022-09-12 22:44:45.620] [memory] [info] allocated 

Google Colab provides by default some sample data. We can simply load it into Tuplex using the `csv` command.

In [7]:
ds = c.csv('sample_data/california_housing_train.csv')

[2022-09-12 22:44:56.155] [fileinputoperator] [info] found 0 files (0.00 B) to process.


In [8]:
ds.show(5)

Without any further information, Tuplex automatically deduces types for each column. In order to check what types Tuplex deduced, we can use the `columns` and `types` properties of a Tuplex dataset.

In [9]:
columns = ds.columns
types = ds.types

# print out as nicely formatted dictionary
dict(zip(columns, types))

TypeError: zip argument #1 must support iteration

Sometimes however, it may be desirable to assign specific types to individual columns. Luckily, Tuplex provides a mechanism for this as well:

In [10]:
c.csv('sample_data/california_housing_train.csv',  type_hints={'longitude' : float, 'latitude' : str}).show(4)

[2022-09-12 22:45:02.469] [fileinputoperator] [info] found 0 files (0.00 B) to process.


Let's say we now want to create a file containing only data entries where the `housing_median_age` is larger than `50`:

In [11]:
ds.filter(lambda r: r['housing_median_age'] > 50).tocsv('lt50.csv', num_parts=0)

In order to speedup data output, Tuplex by default uses multiple threads to create multiple output parts.

In [12]:
!head lt50.part0.csv

head: cannot open 'lt50.part0.csv' for reading: No such file or directory


Besides CSV files, Tuplex also has experimental support to read/write [ORC files](https://https://orc.apache.org/), which may be a more space efficient solution depending on the data and workload.

In [13]:
ds.toorc('lt50.orc')

Similarly, the orc files can be read using the `orc` command.

In [14]:
c.orc('lt50.part0.orc').show(5)

[2022-09-12 22:45:05.208] [fileinputoperator] [info] found 0 files (0.00 B) to process.


## 2.2 Working with larger files
Naturally, the benefit of Tuplex's compilation comes into play when working with larger files. To demonstrate this, let's assume we want to work with the 311 original data. A subset of this (1GB) can be downloaded via the following command

In [18]:
!gdown https://drive.google.com/uc?id=18e2GyoQKLnQ2_uaUcaSOsLRlIT-7tqpN && tar xf 311_subset.tar.gz

Downloading...
From: https://drive.google.com/uc?id=18e2GyoQKLnQ2_uaUcaSOsLRlIT-7tqpN
To: /home/leonhard/projects/2nd-copy/311_subset.tar.gz
100%|████████████████████████████████████████| 214M/214M [00:37<00:00, 5.77MB/s]


Next, let's create a new context with more memory to process the larger file. You can still reuse the old one albeit at the cost of incurring a lot of disk swapping. Therefore, we delete the old context to free up the space.

In [37]:
del c

In [38]:
!head 311_subset.csv

Unique Key,Created Date,Closed Date,Agency,Agency Name,Complaint Type,Descriptor,Location Type,Incident Zip,Incident Address,Street Name,Cross Street 1,Cross Street 2,Intersection Street 1,Intersection Street 2,Address Type,City,Landmark,Facility Type,Status,Due Date,Resolution Description,Resolution Action Updated Date,Community Board,BBL,Borough,X Coordinate (State Plane),Y Coordinate (State Plane),Open Data Channel Type,Park Facility Name,Park Borough,Vehicle Type,Taxi Company Borough,Taxi Pick Up Location,Bridge Highway Name,Bridge Highway Direction,Road Ramp,Bridge Highway Segment,Latitude,Longitude,Location
19937896,03/01/2011 02:27:57 PM,03/14/2011 03:59:20 PM,DOF,Refunds and Adjustments,DOF Property - Payment Issue,Misapplied Payment,Property Address,10027,,,,,,,ADDRESS,NEW YORK,,N/A,Closed,03/22/2011 02:27:57 PM,The Department of Finance resolved this issue.,03/14/2011 03:59:20 PM,09 MANHATTAN,1019820050,MANHATTAN,,,PHONE,Unspecified,MANHATTAN,,,,,,,,,,
19937901,03/01/2011 1

In [39]:
c = tuplex.Context({'tuplex.redirectToPythonLogging':True, 'tuplex.executorCount':1, 'tuplex.executorMemory':'2G', 'tuplex.driverMemory':'2G'})

ERROR:root:Failed to start or connect to Tuplex WebUI. Details: MongoDB (mongod) not found on PATH. In order to use Tuplex's WebUI, you need MongoDB installed or point the framework to a running MongoDB instance
INFO:local ee:loaded runtime library from/home/leonhard/.pyenv/versions/3.7.13/lib/python3.7/site-packages/tuplex/libexec/tuplex_runtime.cpython-37m-x86_64-linux-gnu.so
INFO:local ee:initializing LLVM backend
INFO:LLVM:compiling code for skylake
INFO:memory:allocated bitmap managed memory region (2.00 GB, 32.00 MB block size)
INFO:local execution engine:started local executor E/1 (2.00 GB, 32.00 MB default partition size)


Again, we can use Tuplex's autodetection feature to load the file and assign meaningful default types.

In [40]:
ds = c.csv('311_subset.csv')

INFO:E/1:starting detached process queue
INFO:E/1:initialized runtime memory (4.00 MB)
INFO:fileinputoperator:found 1 file (999.08 MB) to process.
INFO:global:sampled file:///home/leonhard/projects/2nd-copy/311_subset.csv on 256.00 KB


In [41]:
dict(zip(ds.columns, ds.types))

{'Unique Key': typing.Union[int, NoneType],
 'Created Date': typing.Union[str, NoneType],
 'Closed Date': typing.Union[str, NoneType],
 'Agency': typing.Union[str, NoneType],
 'Agency Name': typing.Union[str, NoneType],
 'Complaint Type': typing.Union[str, NoneType],
 'Descriptor': typing.Union[str, NoneType],
 'Location Type': typing.Union[str, NoneType],
 'Incident Zip': typing.Union[int, NoneType],
 'Incident Address': typing.Union[str, NoneType],
 'Street Name': typing.Union[str, NoneType],
 'Cross Street 1': typing.Union[str, NoneType],
 'Cross Street 2': typing.Union[str, NoneType],
 'Intersection Street 1': typing.Union[str, NoneType],
 'Intersection Street 2': typing.Union[str, NoneType],
 'Address Type': typing.Union[str, NoneType],
 'City': typing.Union[str, NoneType],
 'Landmark': typing.Union[str, NoneType],
 'Facility Type': typing.Union[str, NoneType],
 'Status': typing.Union[str, NoneType],
 'Due Date': typing.Union[str, NoneType],
 'Resolution Description': typing.Union

Executing a simple query on the input data creates a logical plan under the hood, which then gets optimized into a physical plan together with auto-generated efficient code that gets lowered ultimately to native code optimized for the machine it is executed on.

In [42]:
ds.selectColumns(['Unique Key']).show(5)

INFO:logical planner:logical optimization took 0.005175ms
INFO:codegen:generating pipeline for (Option[i64]) -> (Option[i64]) (1 operator pipelined)
INFO:codegen:generating lambda function for ((Option[i64])) -> Option[i64]
INFO:global:Optimization via LLVM passes took 0.012285 ms
INFO:global:starting code compilation
INFO:global:first compile done
INFO:global:functor Stage_0 retrieved from llvm
INFO:global:retrieving init/release stage functors
INFO:global:Compiled code paths for stage 0 in 0.01 ms
INFO:global:[Transform Stage] Stage 0 compiled to x86 in 0.0185374s
INFO:local ee:split /home/leonhard/projects/2nd-copy/311_subset.csv into 15 tasks
INFO:E/1:[Task Finished] Transform to mem in 0.000269s (5 normal rows, 0 exceptions)
INFO:E/1:[Task Finished] Transform to mem in 0.000376s (0 normal rows, 0 exceptions)
INFO:driver:[Task Finished] Transform to mem in 0.000557s (0 normal rows, 0 exceptions)
INFO:E/1:[Task Finished] Transform to mem in 0.000182s (0 normal rows, 0 exceptions)
IN

+------------+
| Unique Key |
+------------+
| 19937896   |
+------------+
| 19937901   |
+------------+
| 19937902   |
+------------+
| 19937903   |
+------------+
| 19937904   |
+------------+


As for every operation, we can retrieve help using Python's builtin documentation featue.

In [43]:
help(ds.selectColumns)

Help on method selectColumns in module tuplex.dataset:

selectColumns(columns) method of tuplex.dataset.DataSet instance
    selects a subset of columns as defined through columns which is a list or a single column
    
    Args:
        columns: list of strings or integers. A string should reference a column name, whereas as an integer refers to an index. Indices may be negative according to python rules. Order in list determines output order
    
    Returns:
        tuplex.dataset.DataSet: A Tuplex Dataset object that allows further ETL operations



I.e., when looking up the semantics of the `selectColumns` operation, it's also possible to use integers instead of strings to select columns for more flexibility.

In [44]:
ds.selectColumns([0, 1]).show(3)

INFO:logical planner:logical optimization took 0.003723ms
INFO:codegen:generating pipeline for (Option[i64],Option[str]) -> (Option[i64],Option[str]) (1 operator pipelined)
INFO:codegen:generating lambda function for ((Option[i64],Option[str])) -> (Option[i64],Option[str])
INFO:global:Optimization via LLVM passes took 0.012829 ms
INFO:global:starting code compilation
INFO:global:first compile done
INFO:global:functor Stage_0 retrieved from llvm
INFO:global:retrieving init/release stage functors
INFO:global:Compiled code paths for stage 0 in 0.00 ms
INFO:global:[Transform Stage] Stage 0 compiled to x86 in 0.0181983s
INFO:local ee:split /home/leonhard/projects/2nd-copy/311_subset.csv into 15 tasks
INFO:E/1:[Task Finished] Transform to mem in 0.000188s (3 normal rows, 0 exceptions)
INFO:driver:[Task Finished] Transform to mem in 0.000281s (0 normal rows, 0 exceptions)
INFO:E/1:[Task Finished] Transform to mem in 0.000489s (0 normal rows, 0 exceptions)
INFO:E/1:[Task Finished] Transform to

+------------+--------------------------+
| Unique Key | Created Date             |
+------------+--------------------------+
| 19937896   | '03/01/2011 02:27:57 PM' |
+------------+--------------------------+
| 19937901   | '03/01/2011 10:41:13 AM' |
+------------+--------------------------+
| 19937902   | '03/01/2011 09:07:45 AM' |
+------------+--------------------------+


Let's say, we want to use a slightly more complicated pipeline now. As an initial step, let's first investigate what kind ofcomplaint types there are. To find the corresponding column, we can use the meta-data associated with a dataset and then design a first, exploratory query.

In [45]:
def print_table(arr, break_after=5):
   for i in range(len(arr) // break_after +1):
    print(' | '.join(arr[i * break_after:(i +1)* break_after]))

print_table(ds.columns)

Unique Key | Created Date | Closed Date | Agency | Agency Name
Complaint Type | Descriptor | Location Type | Incident Zip | Incident Address
Street Name | Cross Street 1 | Cross Street 2 | Intersection Street 1 | Intersection Street 2
Address Type | City | Landmark | Facility Type | Status
Due Date | Resolution Description | Resolution Action Updated Date | Community Board | BBL
Borough | X Coordinate (State Plane) | Y Coordinate (State Plane) | Open Data Channel Type | Park Facility Name
Park Borough | Vehicle Type | Taxi Company Borough | Taxi Pick Up Location | Bridge Highway Name
Bridge Highway Direction | Road Ramp | Bridge Highway Segment | Latitude | Longitude
Location


In [46]:
complaint_types = ds.selectColumns(['Complaint Type']).unique().collect()

INFO:logical planner:logical optimization took 0.003931ms
INFO:codegen:generating pipeline for (Option[str]) -> (Option[str]) (2 operators pipelined)
INFO:codegen:generating lambda function for ((Option[str])) -> Option[str]
tuplex.optimizer.mergeExceptionsInOrder=false
INFO:global:Optimization via LLVM passes took 0.010179 ms
INFO:global:starting code compilation
INFO:global:first compile done
INFO:global:functor Stage_0 retrieved from llvm
INFO:global:retrieving init/release stage functors
INFO:global:Compiled code paths for stage 0 in 0.00 ms
INFO:global:[Transform Stage] Stage 0 compiled to x86 in 0.0143798s
INFO:local ee:split /home/leonhard/projects/2nd-copy/311_subset.csv into 15 tasks
INFO:E/1:[Task Finished] Transform to in-memory hash table in 0.227395s (0 normal rows, 0 exceptions, 129 buckets)
INFO:driver:[Task Finished] Transform to in-memory hash table in 0.221090s (0 normal rows, 0 exceptions, 137 buckets)
INFO:driver:[Task Finished] Transform to in-memory hash table in 

In [47]:
print(complaint_types)

['Mosquitoes', 'DOF Parking - Payment Issue', 'DOF Property - Update Account', 'Street Condition', 'Trans Fat', 'Plumbing', 'Benefit Card Replacement', 'DOF Parking - Address Update', 'Non-Emergency Police Matter', 'Harboring Bees/Wasps', 'Home Delivered Meal - Missed Delivery', 'HPD Literature Request', 'Health', 'Beach/Pool/Sauna Complaint', 'Unsanitary Animal Facility', 'Ferry Complaint', 'Illegal Parking', 'Drug Activity', 'DRIE', 'Dead/Dying Tree', 'Overflowing Litter Baskets', 'Unleashed Dog', 'BEST/Site Safety', 'Vending', 'Sidewalk Condition', 'Highway Sign - Damaged', 'Bridge Condition', 'Public Payphone Complaint', 'Overgrown Tree/Branches', 'Broken Parking Meter', 'Animal-Abuse', 'Taxi Complaint', 'Green Taxi Complaint', 'NONCONST', 'Abandoned Vehicle', 'Noise - Commercial', 'Noise - Helicopter', 'New Tree Request', 'Noise', 'Illegal Fireworks', 'X-Ray Machine/Equipment', 'Discipline and Suspension', 'Animal in a Park', 'Transportation Provider Complaint', 'Tattooing', 'Haza

Looking at the data, we see that there are some complaints regarding mosquitoes. Likely, because it gets quite hot and humid in summer in New York City! Can the data back this up?

To find out, let's plot the number of mosquito complaints per month for the last year. A helpful function for aggregating the results is `aggregateByKey`:

In [48]:
help(ds.aggregateByKey)

Help on method aggregateByKey in module tuplex.dataset:

aggregateByKey(combine, aggregate, initial_value, key_columns) method of tuplex.dataset.DataSet instance
    An experimental aggregateByKey function similar to aggregate. There are several scenarios that do not work with this function yet and its performance hasn't been properly
    optimized either. Data is grouped by the supplied key_columns. Then, for each group a new aggregate is initialized using the initial_value, which can be thought of as a neutral value.
    The aggregate function is then called for each element and the current aggregate structure. It is guaranteed that the combine function is called at least once per group by applying the initial_value to the aggregate.
    Args:
        combine: a UDF to combine two aggregates (results of the aggregate function or the initial_value). E.g., cobmine = lambda agg1, agg2: agg1 + agg2. The initial value should be the neutral element.
        aggregate: a UDF which produces 

Next, let's use a UDF to extract the month and year of the complaint and limit the search to complain types so Tuplex automatically processes fewer rows.

In [49]:
ds.selectColumns(['Created Date']).show(5)

INFO:logical planner:logical optimization took 0.004637ms
INFO:codegen:generating pipeline for (Option[str]) -> (Option[str]) (1 operator pipelined)
INFO:codegen:generating lambda function for ((Option[str])) -> Option[str]
INFO:global:Optimization via LLVM passes took 0.010873 ms
INFO:global:starting code compilation
INFO:global:first compile done
INFO:global:functor Stage_0 retrieved from llvm
INFO:global:retrieving init/release stage functors
INFO:global:Compiled code paths for stage 0 in 0.00 ms
INFO:global:[Transform Stage] Stage 0 compiled to x86 in 0.0160895s
INFO:local ee:split /home/leonhard/projects/2nd-copy/311_subset.csv into 15 tasks
INFO:E/1:[Task Finished] Transform to mem in 0.000198s (5 normal rows, 0 exceptions)
INFO:driver:[Task Finished] Transform to mem in 0.000220s (0 normal rows, 0 exceptions)
INFO:E/1:[Task Finished] Transform to mem in 0.000408s (0 normal rows, 0 exceptions)
INFO:E/1:[Task Finished] Transform to mem in 0.000380s (0 normal rows, 0 exceptions)
IN

+--------------------------+
| Created Date             |
+--------------------------+
| '03/01/2011 02:27:57 PM' |
+--------------------------+
| '03/01/2011 10:41:13 AM' |
+--------------------------+
| '03/01/2011 09:07:45 AM' |
+--------------------------+
| '03/01/2011 05:39:26 PM' |
+--------------------------+
| '03/01/2011 11:08:14 AM' |
+--------------------------+


In [50]:
year_to_investigate = 2019

def extract_month(row):
  date = row['Created Date']
  date = date[:date.find(' ')]
  return int(date.split('/')[0])

def extract_year(row):
  date = row['Created Date']
  date = date[:date.find(' ')]
  return int(date.split('/')[-1])

ds2 = ds.withColumn('Month', extract_month) \
  .withColumn('Year', extract_year) \
  .filter(lambda row: 'Mosquito' in row['Complaint Type']) \
  .filter(lambda row: row['Year'] == year_to_investigate) \
  .selectColumns(['Month', 'Year', 'Complaint Type'])


ds2.show(5)


INFO:logical planner:logical optimization took 0.015383ms
INFO:codegen:generating pipeline for (Option[str],Option[str]) -> (i64,i64,Option[str]) (5 operators pipelined)
INFO:codegen:generating lambda function for ((Option[str],Option[str])) -> boolean
INFO:codegen:generating function extract_month for ((Option[str],Option[str])) -> i64
INFO:codegen:generating function extract_year for ((Option[str],Option[str],i64)) -> i64
INFO:codegen:generating lambda function for ((Option[str],Option[str],i64,i64)) -> boolean
INFO:codegen:generating lambda function for ((Option[str],Option[str],i64,i64)) -> (i64,i64,Option[str])
INFO:global:Optimization via LLVM passes took 0.033716 ms
INFO:global:starting code compilation
INFO:global:first compile done
INFO:global:functor Stage_0 retrieved from llvm
INFO:global:retrieving init/release stage functors
INFO:global:Compiled code paths for stage 0 in 0.01 ms
INFO:global:[Transform Stage] Stage 0 compiled to x86 in 0.0496439s
INFO:local ee:split /home/l

+-------+------+----------------+
| Month | Year | Complaint Type |
+-------+------+----------------+
| 12    | 2019 | 'Mosquitoes'   |
+-------+------+----------------+
| 12    | 2019 | 'Mosquitoes'   |
+-------+------+----------------+
| 12    | 2019 | 'Mosquitoes'   |
+-------+------+----------------+
| 7     | 2019 | 'Mosquitoes'   |
+-------+------+----------------+


We can now use the aggregateByKey function to count the number of mosquito complaints per month in 2019.

In [51]:
def combine_udf(a, b):
  return a + b

def aggregate_udf(agg, row):
  return agg + 1

ds2.aggregateByKey(combine_udf, aggregate_udf, 0, ["Month"]).show()

INFO:logical planner:logical optimization took 0.018371ms
INFO:codegen:generating pipeline for (Option[str],Option[str]) -> (i64,i64) (6 operators pipelined)
INFO:codegen:generating lambda function for ((Option[str],Option[str])) -> boolean
INFO:codegen:generating function extract_month for ((Option[str],Option[str])) -> i64
INFO:codegen:generating function extract_year for ((Option[str],Option[str],i64)) -> i64
INFO:codegen:generating lambda function for ((Option[str],Option[str],i64,i64)) -> boolean
INFO:codegen:generating lambda function for ((Option[str],Option[str],i64,i64)) -> (i64,i64,Option[str])
INFO:codegen:generating function combine_udf for (i64,i64) -> i64
INFO:codegen:generating function aggregate_udf for (i64,(i64,i64,Option[str])) -> i64
tuplex.optimizer.mergeExceptionsInOrder=false
INFO:global:Optimization via LLVM passes took 0.037240 ms
INFO:global:starting code compilation
INFO:global:first compile done
INFO:global:functor Stage_0 retrieved from llvm
INFO:global:ret

+-------+---+
| Month |   |
+-------+---+
| 12    | 3 |
+-------+---+
| 7     | 1 |
+-------+---+


Yet, it seems that mosquito complaints are actually not that common. In total there are 4 complaints for the whole year, of which 3 are in December. Thus we actually can't draw with such little support any meaningful conclusions about mosquitos in NYC from the 311 dataset.

Let's step back and check actually, what kind of complaint is actually the most common:

In [52]:
ds.aggregateByKey(combine_udf, aggregate_udf, 0, ["Complaint Type"]).show()

INFO:logical planner:logical optimization took 0.002201ms
INFO:codegen:generating pipeline for (Option[i64],Option[str],Option[str],Option[str],Option[str],Option[str],Option[str],Option[str],Option[i64],Option[str],Option[str],Option[str],Option[str],Option[str],Option[str],Option[str],Option[str],Option[str],Option[str],Option[str],Option[str],Option[str],Option[str],Option[str],Option[i64],Option[str],Option[i64],Option[i64],Option[str],Option[str],Option[str],Option[str],Option[str],Option[str],Option[str],Option[str],Option[str],Option[str],Option[f64],Option[f64],Option[str]) -> (Option[str],i64) (1 operator pipelined)
INFO:codegen:generating function combine_udf for (i64,i64) -> i64
INFO:codegen:generating function aggregate_udf for (i64,(Option[i64],Option[str],Option[str],Option[str],Option[str],Option[str],Option[str],Option[str],Option[i64],Option[str],Option[str],Option[str],Option[str],Option[str],Option[str],Option[str],Option[str],Option[str],Option[str],Option[str],Opti

setdefault w. key=Consumer Complaint
setdefault w. key=Consumer Complaint
setdefault w. key=Consumer Complaint
setdefault w. key=Consumer Complaint
setdefault w. key=Consumer Complaint
setdefault w. key=Consumer Complaint
setdefault w. key=Consumer Complaint
setdefault w. key=Consumer Complaint
setdefault w. key=Consumer Complaint
setdefault w. key=Taxi Complaint
setdefault w. key=Consumer Complaint
setdefault w. key=Taxi Complaint
setdefault w. key=Consumer Complaint
setdefault w. key=For Hire Vehicle Complaint
setdefault w. key=Consumer Complaint
setdefault w. key=Consumer Complaint
setdefault w. key=For Hire Vehicle Complaint
setdefault w. key=Consumer Complaint
setdefault w. key=Consumer Complaint
setdefault w. key=Consumer Complaint
setdefault w. key=Taxi Complaint
setdefault w. key=Taxi Complaint
setdefault w. key=Taxi Complaint
setdefault w. key=Consumer Complaint
setdefault w. key=Consumer Complaint
setdefault w. key=Consumer Complaint
setdefault w. key=Consumer Complaint
setde

INFO:driver:[Task Finished] Resolve in 0.011660s
INFO:E/1:[Task Finished] Resolve in 0.015524s
INFO:driver:[Task Finished] Resolve in 0.025153s
INFO:E/1:[Task Finished] Resolve in 0.030859s
INFO:local ee:slow path resolved 510/510 exceptions in 0.248894s
INFO:local ee:slow path for Stage 0: total wall clock time: 0.462249s, time to process 1 row via slow path: 0.906371ms
INFO:global:[Transform Stage] Stage 0 completed 15 resolve tasks in 0.248915s
INFO:global:[Transform Stage] Stage 0 completed 15 sink tasks in 0.00174519s
INFO:global:[Transform Stage] Stage 0 took 2.74754s
INFO:global:[Transform Stage] skipped stage 1 because there is nothing todo here.
INFO:global:Query Execution took 2.7727s. (planning: 0.0225033s, execution: 2.75019s)
INFO:global:Collecting result of 222 rows took 0.001815 seconds


setdefault w. key=Consumer Complaint
setdefault w. key=Consumer Complaint
setdefault w. key=Taxi Complaint
setdefault w. key=Taxi Complaint
setdefault w. key=Taxi Complaint
setdefault w. key=Consumer Complaint
setdefault w. key=Housing Options
setdefault w. key=Consumer Complaint
setdefault w. key=Consumer Complaint
setdefault w. key=Consumer Complaint
setdefault w. key=Taxi Complaint
setdefault w. key=Taxi Complaint
setdefault w. key=Consumer Complaint
setdefault w. key=Consumer Complaint
setdefault w. key=Consumer Complaint
setdefault w. key=Consumer Complaint
setdefault w. key=Taxi Complaint
setdefault w. key=Consumer Complaint
setdefault w. key=Taxi Complaint
setdefault w. key=Taxi Complaint
setdefault w. key=Taxi Complaint
setdefault w. key=Consumer Complaint
setdefault w. key=Consumer Complaint
setdefault w. key=Consumer Complaint
setdefault w. key=Taxi Complaint
setdefault w. key=Taxi Complaint
setdefault w. key=Consumer Complaint
setdefault w. key=Consumer Complaint
setdefault 

To see what the most common complaint is, let's sort the output:

In [53]:
data = ds.aggregateByKey(combine_udf, aggregate_udf, 0, ["Complaint Type"]).collect()

sorted(data, key=lambda x: x[1])

data[:5]

INFO:logical planner:logical optimization took 0.003317ms
INFO:codegen:generating pipeline for (Option[i64],Option[str],Option[str],Option[str],Option[str],Option[str],Option[str],Option[str],Option[i64],Option[str],Option[str],Option[str],Option[str],Option[str],Option[str],Option[str],Option[str],Option[str],Option[str],Option[str],Option[str],Option[str],Option[str],Option[str],Option[i64],Option[str],Option[i64],Option[i64],Option[str],Option[str],Option[str],Option[str],Option[str],Option[str],Option[str],Option[str],Option[str],Option[str],Option[f64],Option[f64],Option[str]) -> (Option[str],i64) (1 operator pipelined)
INFO:codegen:generating function combine_udf for (i64,i64) -> i64
INFO:codegen:generating function aggregate_udf for (i64,(Option[i64],Option[str],Option[str],Option[str],Option[str],Option[str],Option[str],Option[str],Option[i64],Option[str],Option[str],Option[str],Option[str],Option[str],Option[str],Option[str],Option[str],Option[str],Option[str],Option[str],Opti

setdefault w. key=Consumer Complaint
setdefault w. key=Consumer Complaint
setdefault w. key=Consumer Complaint
setdefault w. key=Consumer Complaint
setdefault w. key=Consumer Complaint
setdefault w. key=Consumer Complaint
setdefault w. key=Consumer Complaint
setdefault w. key=Consumer Complaint
setdefault w. key=Consumer Complaint
setdefault w. key=Consumer Complaint
setdefault w. key=Consumer Complaint
setdefault w. key=Consumer Complaint
setdefault w. key=Consumer Complaint
setdefault w. key=Consumer Complaint
setdefault w. key=Consumer Complaint
setdefault w. key=Consumer Complaint
setdefault w. key=Consumer Complaint
setdefault w. key=Taxi Complaint
setdefault w. key=Consumer Complaint
setdefault w. key=Taxi Complaint
setdefault w. key=Taxi Complaint
setdefault w. key=Consumer Complaint
setdefault w. key=Taxi Complaint
setdefault w. key=For Hire Vehicle Complaint
setdefault w. key=Consumer Complaint
setdefault w. key=Consumer Complaint
setdefault w. key=Housing Options
setdefault w

INFO:local ee:slow path resolved 510/510 exceptions in 0.223588s
INFO:local ee:slow path for Stage 0: total wall clock time: 0.401403s, time to process 1 row via slow path: 0.787065ms
INFO:global:[Transform Stage] Stage 0 completed 15 resolve tasks in 0.223618s
INFO:global:[Transform Stage] Stage 0 completed 15 sink tasks in 0.00194741s
INFO:global:[Transform Stage] Stage 0 took 2.78945s
INFO:global:[Transform Stage] skipped stage 1 because there is nothing todo here.
INFO:global:Query Execution took 2.81643s. (planning: 0.0238355s, execution: 2.7926s)
INFO:python:Data transfer back to Python took 0.001298 seconds


[('Mosquitoes', 4),
 ('DOF Parking - Payment Issue', 19487),
 ('DOF Property - Update Account', 65),
 ('Street Condition', 95585),
 ('Trans Fat', 6)]

As we can see, ?? is the most common complaint.

(c) 2017 - 2022 Tuplex team