Skip to content

Commit

Permalink
LOAD DATA and COPY usage samples
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeronimo Lopez committed May 5, 2018
1 parent f770073 commit 44a8090
Show file tree
Hide file tree
Showing 3 changed files with 234 additions and 0 deletions.
91 changes: 91 additions & 0 deletions src/main/java/com/jerolba/benchmark/CopyInsert.java
@@ -0,0 +1,91 @@
package com.jerolba.benchmark;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.SQLException;
import java.text.SimpleDateFormat;
import java.util.Iterator;
import java.util.function.Supplier;

import org.postgresql.copy.CopyManager;
import org.postgresql.jdbc.PgConnection;

import com.jerolba.benchmark.shared.BenchmarkMeter;
import com.jerolba.benchmark.shared.CityBikeParser;
import com.jerolba.benchmark.shared.CityBikeReader;
import com.jerolba.benchmark.shared.ConnectionProvider;
import com.jerolba.benchmark.shared.TableHelper;
import com.jerolba.benchmark.shared.TripEntity;

public class CopyInsert {

private final static String COPY = "COPY bike_trip (tripduration, starttime, stoptime, "
+ "start_station_id, start_station_name, start_station_latitude, start_station_longitude, "
+ "end_station_id, end_station_name, end_station_latitude, end_station_longitude, bike_id, "
+ "user_type, birth_year, gender) FROM STDIN WITH (FORMAT TEXT, ENCODING 'UTF-8', DELIMITER '\t', HEADER false)";

private static final SimpleDateFormat sdfDateTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

public static void main(String[] args) throws IOException, SQLException {
String properties = args[0];
int batchSize = Integer.parseInt(args[1]);

Supplier<Connection> connectionSuplier = new ConnectionProvider(properties);
try (Connection connection = connectionSuplier.get()) {
TableHelper.createTable(connection);

CityBikeParser<TripEntity> parser = new CityBikeParser<>(() -> new TripEntity());
CityBikeReader<TripEntity> reader = new CityBikeReader<>("/tmp", str -> parser.parse(str));

PgConnection unwrapped = connection.unwrap(PgConnection.class);
CopyManager copyManager = unwrapped.getCopyAPI();

BenchmarkMeter meter = new BenchmarkMeter(JpaSimpleInsert.class, properties, batchSize);
meter.meter(() -> reader.forEachCsvInZip(trips -> {
try {
int cont = 0;
StringBuilder sb = new StringBuilder();
Iterator<TripEntity> iterator = trips.iterator();
while (iterator.hasNext()) {
TripEntity trip = iterator.next();
sb.append(trip.getTripduration()).append("\t");
sb.append(sdfDateTime.format(trip.getStarttime())).append("\t");
sb.append(sdfDateTime.format(trip.getStoptime())).append("\t");
sb.append(trip.getStartStationId()).append("\t");
sb.append(trip.getStartStationName()).append("\t");
sb.append(trip.getStartStationLatitude()).append("\t");
sb.append(trip.getStartStationLongitude()).append("\t");
sb.append(trip.getEndStationId()).append("\t");
sb.append(trip.getEndStationName()).append("\t");
sb.append(trip.getEndStationLatitude()).append("\t");
sb.append(trip.getEndStationLongitude()).append("\t");
sb.append(trip.getBikeId()).append("\t");
sb.append(trip.getUserType()).append("\t");
sb.append(nullify(trip.getBirthYear())).append("\t");
sb.append(trip.getGender());
sb.append("\n");
cont++;
if (cont % batchSize == 0) {
InputStream is = new ByteArrayInputStream(sb.toString().getBytes());
copyManager.copyIn(COPY, is);
sb.setLength(0);
}
}
} catch (Exception e) {
e.printStackTrace();
}})
);

}
}

private static String nullify(Integer value) {
if (value==null) {
return "\\N";
}
return value.toString();
}

}
92 changes: 92 additions & 0 deletions src/main/java/com/jerolba/benchmark/LoadDataInsert.java
@@ -0,0 +1,92 @@
package com.jerolba.benchmark;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.SQLException;
import java.text.SimpleDateFormat;
import java.util.Iterator;
import java.util.function.Supplier;

import com.jerolba.benchmark.shared.BenchmarkMeter;
import com.jerolba.benchmark.shared.CityBikeParser;
import com.jerolba.benchmark.shared.CityBikeReader;
import com.jerolba.benchmark.shared.ConnectionProvider;
import com.jerolba.benchmark.shared.TableHelper;
import com.jerolba.benchmark.shared.TripEntity;
import com.mysql.jdbc.Statement;

public class LoadDataInsert {

private final static String LOADDATA = "LOAD DATA LOCAL INFILE '' INTO TABLE `bike_trip` CHARACTER SET UTF8 "
+ "FIELDS TERMINATED BY '\t' ENCLOSED BY '' ESCAPED BY '\\\\' LINES TERMINATED BY '\n' "
+ "STARTING BY '' "
+ "(`tripduration`, `starttime`, `stoptime`, `start_station_id`, `start_station_name`, "
+ "`start_station_latitude`, `start_station_longitude`, `end_station_id`, `end_station_name`, "
+ "`end_station_latitude`, `end_station_longitude`, `bike_id`, `user_type`, `birth_year`, `gender`)";

private static final SimpleDateFormat sdfDateTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

public static void main(String[] args) throws IOException, SQLException {
String properties = args[0];
int batchSize = Integer.parseInt(args[1]);
Supplier<Connection> connectionSuplier = new ConnectionProvider(properties);
try (Connection connection = connectionSuplier.get()) {
TableHelper.createTable(connection);

CityBikeParser<TripEntity> parser = new CityBikeParser<>(() -> new TripEntity());
CityBikeReader<TripEntity> reader = new CityBikeReader<>("/tmp", str -> parser.parse(str));

com.mysql.jdbc.Connection unwrapped = connection.unwrap(com.mysql.jdbc.Connection.class);
unwrapped.setAllowLoadLocalInfile(true);

BenchmarkMeter meter = new BenchmarkMeter(JpaSimpleInsert.class, properties, batchSize);
meter.meter(() -> reader.forEachCsvInZip(trips -> {
try {
int cont = 0;
StringBuilder sb = new StringBuilder();
Iterator<TripEntity> iterator = trips.iterator();
while (iterator.hasNext()) {
TripEntity trip = iterator.next();
sb.append(trip.getTripduration()).append("\t");
sb.append(sdfDateTime.format(trip.getStarttime())).append("\t");
sb.append(sdfDateTime.format(trip.getStoptime())).append("\t");
sb.append(trip.getStartStationId()).append("\t");
sb.append(trip.getStartStationName()).append("\t");
sb.append(trip.getStartStationLatitude()).append("\t");
sb.append(trip.getStartStationLongitude()).append("\t");
sb.append(trip.getEndStationId()).append("\t");
sb.append(trip.getEndStationName()).append("\t");
sb.append(trip.getEndStationLatitude()).append("\t");
sb.append(trip.getEndStationLongitude()).append("\t");
sb.append(trip.getBikeId()).append("\t");
sb.append(trip.getUserType()).append("\t");
sb.append(nullify(trip.getBirthYear())).append("\t");
sb.append(trip.getGender()).append("\t");
sb.append("\n");
cont++;
if (cont % batchSize == 0) {
Statement statement = (Statement) unwrapped.createStatement();
InputStream is = new ByteArrayInputStream(sb.toString().getBytes());
statement.setLocalInfileInputStream(is);
statement.execute(LOADDATA);
sb.setLength(0);
}
}
} catch (SQLException e) {
e.printStackTrace();
}})
);

}
}

private static String nullify(Integer value) {
if (value==null) {
return "\\N";
}
return value.toString();
}

}
@@ -0,0 +1,51 @@
/**
* Copyright 2017 Jerónimo López Bezanilla
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jerolba.benchmark.shared;

import java.io.IOException;
import java.io.Reader;

public class StringBuilderReader extends Reader {

private final StringBuilder sb;
private int next = 0;

public StringBuilderReader(StringBuilder sb) {
this.sb = sb;
}

@Override
public int read(char cbuf[], int off, int len) throws IOException {
if (off < 0 || off > cbuf.length || len < 0 || off + len > cbuf.length) {
throw new IndexOutOfBoundsException("off=" + off + " len=" + len + " cbuf.length=" + cbuf.length);
}
if (len == 0) {
return 0;
}
if (next >= sb.length()) {
return -1;
}
int n = Math.min(sb.length() - next, len);
sb.getChars(next, next + n, cbuf, off);
next += n;
return n;
}

@Override
public void close() throws IOException {
sb.setLength(0);
}
}

0 comments on commit 44a8090

Please sign in to comment.