Skip to content
This repository has been archived by the owner on Mar 20, 2022. It is now read-only.

Commit

Permalink
fix(sync): parse schema from input data
Browse files Browse the repository at this point in the history
  • Loading branch information
philbooth committed Feb 13, 2018
1 parent d6f66ed commit fb3437e
Showing 1 changed file with 30 additions and 38 deletions.
68 changes: 30 additions & 38 deletions sync.js
Expand Up @@ -14,28 +14,6 @@ const s3 = require('s3')

fs.unlinkAsync = Promise.promisify(fs.unlink)

// HACK: node-parquet doesn't extract the schema on read yet
const SCHEMA = {
// column: index
app_name: 3,
app_version: 4,
uid: 6,
device_id: 8,
device_os_name: 9,
device_os_version: 10,
device_os_locale: 11,
// Not an absolute timestamp! Milliseconds since process start.
event_timestamp: 12,
event_method: 14,
event_object: 15,
event_map_values: {
keys: 17,
values: 18
},
event_flow_id: 20,
event_device_os: 21
}

const MARKER_PATH = path.resolve('.sync-marker')
const DATE = /^(20[1-9][0-9])-([01][0-9])-([0-3][0-9])$/
const PARQUET_FILE = /\.parquet$/i
Expand Down Expand Up @@ -102,17 +80,31 @@ function getDateParts () {

function readLocalData (fileName) {
const reader = new ParquetReader(fileName)
//console.log(reader.info())
const schema = parseSchema(reader.info().spark_schema)
const rows = reader.rows()
reader.close()
return rows
return { schema, rows }
}

function parseSchema (source, shift = 0) {
return Object.keys(source).reduce((target, key, index) => {
const item = source[key]
const nestedItem = item.key_value
if (nestedItem) {
target[key] = parseSchema(nestedItem, index + shift)
shift += Object.keys(nestedItem).length - 1
} else {
target[key] = index + shift
}
return target
}, {})
}

function processData (rows) {
function processData ({ schema, rows }) {
let eventCount = 0, batch = []

return Promise.all(rows.map(row => {
const event = createEvent(row)
const event = createEvent(schema, row)
if (! event) {
return
}
Expand All @@ -136,22 +128,22 @@ function processData (rows) {
.then(() => eventCount)
}

function createEvent (row) {
const eventType = getEventType(row[SCHEMA.event_method], row[SCHEMA.event_object])
function createEvent (schema, row) {
const eventType = getEventType(row[schema.event_method], row[schema.event_object])
if (! eventType) {
return
}

// serverTime is not at all accurate as an event timing, but it's the best thing we have
const time = getServerTime(row[SCHEMA.event_map_values.keys], row[SCHEMA.event_map_values.values])
const time = getServerTime(row[schema.event_map_values.key], row[schema.event_map_values.value])
if (! time || time < 0) {
return
}

const uid = row[SCHEMA.uid]
const syncFlowId = row[SCHEMA.event_flow_id]
const appName = row[SCHEMA.app_name]
const appVersion = row[SCHEMA.app_version]
const uid = row[schema.uid]
const syncFlowId = row[schema.event_flow_id]
const appName = row[schema.app_name]
const appVersion = row[schema.app_version]

return Object.assign({
event_type: `sync - ${eventType}`,
Expand All @@ -160,15 +152,15 @@ function createEvent (row) {
user_id: uid,
// TODO: include device_id when we have a plan for matching it to the other events
session_id: -1,
insert_id: hash(uid, row[SCHEMA.device_id], syncFlowId, time, row[SCHEMA.event_timestamp], eventType),
insert_id: hash(uid, row[schema.device_id], syncFlowId, time, row[schema.event_timestamp], eventType),
app_version: appVersion,
language: row[SCHEMA.device_os_locale],
language: row[schema.device_os_locale],
event_properties: {
ua_browser: appName,
ua_version: appVersion,
flow_id: syncFlowId
}
}, getOs(row[SCHEMA.device_os_name], row[SCHEMA.device_os_version]))
}, getOs(row[schema.device_os_name], row[schema.device_os_version]))
}

function getEventType (method, object) {
Expand Down Expand Up @@ -316,8 +308,8 @@ function processKeyFromS3 (client, keys, index) {
//return readDataFromS3(client, keys, index)
return downloadFileFromS3(client, keys, index)
.then(fileName => {
const rows = readLocalData(fileName)
return Promise.all([ processData(rows), fs.unlinkAsync(fileName) ])
const data = readLocalData(fileName)
return Promise.all([ processData(data), fs.unlinkAsync(fileName) ])
})
.spread(eventCount =>
processKeyFromS3(client, keys, index + 1)
Expand Down

0 comments on commit fb3437e

Please sign in to comment.