In [20]:
using IterTools
using DataFrames
using LibPQ
using BenchmarkTools
using CSV
using Tables

## Download Test Data

In [5]:
download("https://nyc-tlc.s3.amazonaws.com/trip+data/green_tripdata_2019-12.csv", 
    "test_data.csv")

"test_data.csv"

## Create Connection

In [21]:
con_str = "postgres://postgres:python_tutorial_5432@192.168.1.24:15432/postgres"

"postgres://postgres:python_tutorial_5432@192.168.1.24:15432/postgres"

In [22]:
con = LibPQ.Connection(con_str)

PostgreSQL connection (CONNECTION_OK) with parameters:
  user = postgres
  password = ********************
  dbname = postgres
  host = 192.168.1.24
  port = 15432
  client_encoding = UTF8
  options = -c DateStyle=ISO,YMD -c IntervalStyle=iso_8601 -c TimeZone=UTC
  application_name = LibPQ.jl
  sslmode = prefer
  sslcompression = 0
  gssencmode = disable
  target_session_attrs = any

## Create Test Table

In [4]:
execute(con, "drop table test_df")

PostgreSQL result

In [5]:
sql = """
create table test_df (
    trip_reason varchar,
    lpep_pickup_datetime timestamp, 
    passenger_count int, 
    trip_distance numeric
);"""
execute(con, sql)

PostgreSQL result

## Create Test Data

In [23]:
df = CSV.File("test_data.csv") |> DataFrame;

In [24]:
df_sample = df[:, [:lpep_pickup_datetime, :passenger_count, :trip_distance]];
#dropmissing!(df_sample);

In [40]:
sample_str = "for, more; \$, and \"fun\""
print(sample_str)

for, more; $, and "fun"

In [41]:
df_sample[!, :trip_reason] .= sample_str;
first(df_sample, 5)

Unnamed: 0_level_0,lpep_pickup_datetime,passenger_count,trip_distance,trip_reason
Unnamed: 0_level_1,String,Int64⍰,Float64,String
1,2019-12-01 00:09:45,1,0.0,"for, more; $, and ""fun"""
2,2019-12-01 00:26:05,1,0.67,"for, more; $, and ""fun"""
3,2019-12-01 00:56:36,1,0.61,"for, more; $, and ""fun"""
4,2019-12-01 00:26:20,1,3.9,"for, more; $, and ""fun"""
5,2019-12-01 00:56:36,1,0.5,"for, more; $, and ""fun"""


In [34]:
names(df_sample)

4-element Array{Symbol,1}:
 :lpep_pickup_datetime
 :passenger_count     
 :trip_distance       
 :trip_reason         

## Definition of Upload Function

In [45]:
_prepare_field(x:: Any) = x
_prepare_field(x:: Missing) = ""
_prepare_field(x:: AbstractString) = string("\"", replace(x, "\""=>"\"\""), "\"")

_prepare_field (generic function with 3 methods)

In [46]:
"""
    load_by_copy!(table, con:: LibPQ.Connection, tablename:: AbstractString)

Fast data upload using the PostgreSQL `COPY FROM STDIN` method, which is usually much faster,
especially for large data amounts, than SQL Inserts.

`table` must be a Tables.jl compatible data structure.

All columns given in `table` must have corresponding fields in the target DB table,
the order of the columns does not matter.

Columns in the target DB table, which are not provided by the input `table`, are filled 
with `null` (provided they are nullable).
"""
function load_by_copy!(table, con:: LibPQ.Connection, tablename:: AbstractString)
    row_names = join(string.(Tables.columnnames(table)), ",")
    row_strings = imap(Tables.eachrow(table)) do row
        join((_prepare_field(x) for x in row), ",")*"\n"
    end
    copyin = LibPQ.CopyIn("COPY $tablename ($row_names) FROM STDIN (FORMAT CSV);", row_strings)
    execute(con, copyin)
end      

load_by_copy!

## Test

In [47]:
print(_prepare_field(sample_str))

"for, more; $, and ""fun"""

In [52]:
execute(con, "delete from test_df;")

PostgreSQL result

In [53]:
@time load_by_copy!(df_sample, con, "test_df")

  7.111131 seconds (20.09 M allocations: 746.647 MiB, 7.18% gc time)


PostgreSQL result

In [54]:
@time load_by_copy!(df_sample[!, reverse(names(df_sample))], con, "test_df")

  6.447098 seconds (20.09 M allocations: 759.458 MiB, 4.70% gc time)


PostgreSQL result

In [55]:
execute(con, "select count(*) from test_df") |> DataFrame

Unnamed: 0_level_0,count
Unnamed: 0_level_1,Int64⍰
1,901254


## Check DB

In [51]:
df_db = execute(con, "select * from test_df") |> DataFrame

Unnamed: 0_level_0,trip_reason,lpep_pickup_datetime,passenger_count,trip_distance
Unnamed: 0_level_1,String⍰,Dates…⍰,Int32⍰,Decimal…⍰
1,"for, more; $, and ""fun""",2019-12-01T00:09:45,1,"Decimal(0, 0, 0)"
2,"for, more; $, and ""fun""",2019-12-01T02:39:29,1,"Decimal(0, 0, 0)"
3,"for, more; $, and ""fun""",2019-12-01T02:40:30,1,"Decimal(0, 0, 0)"
4,"for, more; $, and ""fun""",2019-12-01T09:44:00,missing,"Decimal(0, 467, -2)"
5,"for, more; $, and ""fun""",2019-12-01T09:25:00,missing,"Decimal(0, 415, -2)"
6,"for, more; $, and ""fun""",2019-12-01T09:00:00,missing,"Decimal(0, 38, -1)"
7,"for, more; $, and ""fun""",2019-12-01T09:16:00,missing,"Decimal(0, 44, -2)"
8,"for, more; $, and ""fun""",2019-12-01T09:57:00,missing,"Decimal(0, 35, -2)"
9,"for, more; $, and ""fun""",2019-12-01T09:43:00,missing,"Decimal(0, 107, -2)"
10,"for, more; $, and ""fun""",2019-12-01T09:16:00,missing,"Decimal(0, 205, -2)"


## Comparison to SQL Insert

In [16]:
@time begin
    execute(con, "BEGIN;")
    LibPQ.load!(df_sample, con,
        """INSERT INTO test_df (lpep_pickup_datetime, passenger_count, 
            trip_distance, trip_reason) VALUES (\$1, \$2, \$3, \$4);""")
    execute(con, "COMMIT;")
end

 97.618149 seconds (38.33 M allocations: 2.109 GiB, 0.86% gc time)


PostgreSQL result

In [17]:
execute(con, "select count(*) from test_df") |> DataFrame

Unnamed: 0_level_0,count
Unnamed: 0_level_1,Int64⍰
1,1351881


The `load_by_copy!` method is more than 10 times faster for inserting data into PostgreSQL!

## Close Connection

In [18]:
close(con)