In [None]:
#r "nuget: Parquet.Net, 5.0.2"

using Parquet;
using Parquet.Data;
using Parquet.Schema;
using System.Net.Http;
using System.Threading.Tasks;
using System.IO;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;

string taxiUrl = "https://blobs.duckdb.org/data/taxi-data-2019.parquet";
string zoneUrl = "https://blobs.duckdb.org/data/zone-lookups.parquet";
var taxiFile = "taxi-data-2019.parquet";
var zoneFile = "zone-lookups.parquet";


In [None]:
async Task SaveFileToDisk(string url, string fileName)
{
    using HttpClient client = new HttpClient();
    using var stream = await client.GetStreamAsync(url);

        // Save the stream to a file on disk
        using var fileStream = new FileStream(fileName, FileMode.Create, FileAccess.Write, FileShare.None);
        await stream.CopyToAsync(fileStream);

}




In [None]:
var sw = System.Diagnostics.Stopwatch.StartNew();
await SaveFileToDisk(taxiUrl, taxiFile);
$"Downloaded {taxiFile} in {sw.ElapsedMilliseconds}ms"

Downloaded taxi-data-2019.parquet in 29334ms

In [None]:
var sw = System.Diagnostics.Stopwatch.StartNew();
await SaveFileToDisk(zoneUrl, zoneFile);
$"Downloaded {zoneFile} in {sw.ElapsedMilliseconds}ms"

Downloaded zone-lookups.parquet in 350ms

In [None]:


public async Task<IReadOnlyCollection<T>> LoadParquetDataAsync<T>(string fileName) where T : new()
{
    var properties = typeof(T).GetProperties().ToDictionary(x => x.Name, x => x.SetMethod);
    using var fileStream = new FileStream(fileName, FileMode.Open, FileAccess.Read, FileShare.None);

    var options = new ParquetOptions { TreatByteArrayAsString = true };
    using var parquetReader = await ParquetReader.CreateAsync(fileStream, options);

    var data = new List<T>();

    for (int i = 0; i < parquetReader.RowGroupCount; i++)
    {
        using (ParquetRowGroupReader groupReader = parquetReader.OpenRowGroupReader(i))
        {
            var dataFields = parquetReader.Schema.GetDataFields();
            var rowCount = (int)groupReader.RowCount;
            var instances = Enumerable.Range(0, rowCount)
            .Select(_ => new T())
            .ToArray();
            data.AddRange(instances);        

            foreach (var field in dataFields)
            {
                if (properties.TryGetValue(field.Name, out var setter))
                {
                    var column = await groupReader.ReadColumnAsync(field);
                    for(long rowIndex = 0; rowIndex < rowCount; rowIndex++)
                    {
                        var value = column.Data.GetValue(rowIndex);
                        setter.Invoke(instances[rowIndex], new[] { value });
                    }

                }
            }


        }
    }

    return data;
}

In [None]:
record TaxiTrip
{
    public string PickupBorough { get; set; }
    public string DropoffBorough { get; set; }
    public long pickup_location_id { get; set; }
    public long dropoff_location_id { get; set; }
}

record Zone
{
    public long LocationID { get; set; }
    public string Borough { get; set; }
}

In [None]:



var sw = System.Diagnostics.Stopwatch.StartNew();
var trips = await LoadParquetDataAsync<TaxiTrip>(taxiFile);

$"Time to load file and instantiate: {sw.ToString()}"


Time to load file and instantiate: 00:01:41.5484720

In [None]:
trips.Take(10)

index,value
,
,
,
,
,
,
,
,
,
,

Unnamed: 0,Unnamed: 1
PickupBorough,<null>
DropoffBorough,<null>
pickup_location_id,164
dropoff_location_id,233

Unnamed: 0,Unnamed: 1
PickupBorough,<null>
DropoffBorough,<null>
pickup_location_id,234
dropoff_location_id,231

Unnamed: 0,Unnamed: 1
PickupBorough,<null>
DropoffBorough,<null>
pickup_location_id,229
dropoff_location_id,162

Unnamed: 0,Unnamed: 1
PickupBorough,<null>
DropoffBorough,<null>
pickup_location_id,234
dropoff_location_id,249

Unnamed: 0,Unnamed: 1
PickupBorough,<null>
DropoffBorough,<null>
pickup_location_id,249
dropoff_location_id,4

Unnamed: 0,Unnamed: 1
PickupBorough,<null>
DropoffBorough,<null>
pickup_location_id,42
dropoff_location_id,141

Unnamed: 0,Unnamed: 1
PickupBorough,<null>
DropoffBorough,<null>
pickup_location_id,163
dropoff_location_id,87

Unnamed: 0,Unnamed: 1
PickupBorough,<null>
DropoffBorough,<null>
pickup_location_id,132
dropoff_location_id,62

Unnamed: 0,Unnamed: 1
PickupBorough,<null>
DropoffBorough,<null>
pickup_location_id,230
dropoff_location_id,43

Unnamed: 0,Unnamed: 1
PickupBorough,<null>
DropoffBorough,<null>
pickup_location_id,142
dropoff_location_id,237


In [None]:



var sw = System.Diagnostics.Stopwatch.StartNew();
var zones = await LoadParquetDataAsync<Zone>(zoneFile);

$"Time to load file and instantiate: {sw.ToString()}"

Time to load file and instantiate: 00:00:00.0050833

In [None]:
zones.Take(10)

index,value
,
,
,
,
,
,
,
,
,
,

Unnamed: 0,Unnamed: 1
LocationID,1
Borough,EWR

Unnamed: 0,Unnamed: 1
LocationID,2
Borough,Queens

Unnamed: 0,Unnamed: 1
LocationID,3
Borough,Bronx

Unnamed: 0,Unnamed: 1
LocationID,4
Borough,Manhattan

Unnamed: 0,Unnamed: 1
LocationID,5
Borough,Staten Island

Unnamed: 0,Unnamed: 1
LocationID,6
Borough,Staten Island

Unnamed: 0,Unnamed: 1
LocationID,7
Borough,Queens

Unnamed: 0,Unnamed: 1
LocationID,8
Borough,Queens

Unnamed: 0,Unnamed: 1
LocationID,9
Borough,Queens

Unnamed: 0,Unnamed: 1
LocationID,10
Borough,Queens


First, unoptimized, idea for a query (nothing indexed, taking data as coming from data source)

In [None]:
var sw = System.Diagnostics.Stopwatch.StartNew();
var manhattanZones = zones.Where(x => x.Borough == "Manhattan").Select(zone => zone.LocationID).ToHashSet();
var query = trips.Where(trip => manhattanZones.Contains(trip.pickup_location_id) || manhattanZones.Contains(trip.dropoff_location_id))
.GroupBy(trip => (trip.pickup_location_id, trip.dropoff_location_id))
.Select(group => new { group.Key.pickup_location_id, group.Key.dropoff_location_id, Count = group.Count() })
.OrderByDescending(x => x.Count)
.Take(5)
.ToList();

Console.WriteLine($" Time to query: {sw.ToString()}");
query

 Time to query: 00:00:20.1773442


index,value
,
,
,
,
,
0,"{ pickup_location_id = 237, dropoff_location_id = 236, Count = 536621 }pickup_location_id237dropoff_location_id236Count536621"
,
pickup_location_id,237
dropoff_location_id,236
Count,536621

Unnamed: 0,Unnamed: 1
pickup_location_id,237
dropoff_location_id,236
Count,536621

Unnamed: 0,Unnamed: 1
pickup_location_id,236
dropoff_location_id,237
Count,455954

Unnamed: 0,Unnamed: 1
pickup_location_id,236
dropoff_location_id,236
Count,451805

Unnamed: 0,Unnamed: 1
pickup_location_id,237
dropoff_location_id,237
Count,435054

Unnamed: 0,Unnamed: 1
pickup_location_id,239
dropoff_location_id,238
Count,236737


Idea about Report: Given the borrow, we have the most frequent trips. Normally, we would already read data like this, but in order to avoid reading twice, we just do it here as a followup step.

In [None]:
var sw = System.Diagnostics.Stopwatch.StartNew();
var numberOfTripsByPickupAndDropOff = trips
                .GroupBy(trip => (trip.pickup_location_id, trip.dropoff_location_id))
                .Select(group => new { group.Key.pickup_location_id, group.Key.dropoff_location_id, Count = group.Count() })
                .OrderByDescending(x => x.Count)
                .ToArray();
Console.WriteLine($" Time to build index: {sw.ToString()}");

 Time to build index: 00:00:23.9330322


In [None]:
var sw = System.Diagnostics.Stopwatch.StartNew();

var manhattanZones = zones.Where(x => x.Borough == "Manhattan").Select(zone => zone.LocationID).ToHashSet();
var query = numberOfTripsByPickupAndDropOff.Where(trip => manhattanZones.Contains(trip.pickup_location_id) || manhattanZones.Contains(trip.dropoff_location_id))
.Take(5)
.ToList();

Console.WriteLine($" Time to query: {sw.ToString()}");
query

 Time to query: 00:00:00.0006824


index,value
,
,
,
,
,
0,"{ pickup_location_id = 237, dropoff_location_id = 236, Count = 536621 }pickup_location_id237dropoff_location_id236Count536621"
,
pickup_location_id,237
dropoff_location_id,236
Count,536621

Unnamed: 0,Unnamed: 1
pickup_location_id,237
dropoff_location_id,236
Count,536621

Unnamed: 0,Unnamed: 1
pickup_location_id,236
dropoff_location_id,237
Count,455954

Unnamed: 0,Unnamed: 1
pickup_location_id,236
dropoff_location_id,236
Count,451805

Unnamed: 0,Unnamed: 1
pickup_location_id,237
dropoff_location_id,237
Count,435054

Unnamed: 0,Unnamed: 1
pickup_location_id,239
dropoff_location_id,238
Count,236737
