Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitnore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pysql_beam/examples/pipeline.ipynb
pysql_beam/examples/*sql
Binary file added .gitnore.swp
Binary file not shown.
67 changes: 67 additions & 0 deletions .ipynb_checkpoints/README-checkpoint.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
## pysql-beam

### This package is still under development but has been used in few projects in production. This has been tested with dataflow Runner and Direct runner

This package aim to provide Apache_beam io connector for MySQL and Postgres database.


This package provides apache beam io connector for postgres db and mysql db.
This package wil aim to be pure python implementation for both io connector

FYI: This does not uses any jdbc or odbc connector

Requirements:

1. Python>=2.7 or python>= 3.5
2. Apache beam >= 2.10
3. pymysql[rsa]
4. psycopg2-binary


Installation:

1. pip install git+git@github.com:MediaAgility/pysql-beam.git
or
2. pip installl pysql-beam


Current functionality:

1. Read from MySQL database by passing either table name or sql query
2. Read from Postgres database by passing either table name or sql query


Reference Guide:

1. Java IO connector for the same:
https://github.com/spotify/dbeam

2. How to write io connector for Apache Beam:
https://beam.apache.org/documentation/io/developing-io-overview/

https://beam.apache.org/documentation/io/developing-io-python/

Usage Guide:
```
from pysql_beam.sql_io.sql import ReadFromSQL

....
ReadFromSQL(host=self.options.host, port=self.options.port,
username=self.options.username, password=self.options.password,
databse=self.options.database,
query=self.options.source_query,
wrapper=PostgresWrapper,
batch=100000)

```
Examples:

For mysql:
`python cloud_sql_to_file.py --host localhost --port 3306 --database SECRET_DATABASE --username SECRET_USER --password SECRET_PASSWORD --table YOUR_TABLE --output YOUR_OUTPUT_FLLE`

For postgres:
`python cloud_sql_to_file.py --host localhost --port 5432 --database SECRET_DATABASE --username SECRET_USER --password SECRET_PASSWORD --table YOUR_TABLE --output YOUR_OUTPUT_FLLE`


contribution:
You can contribute to this package by raising bugs or sending pull requests
5 changes: 5 additions & 0 deletions .ipynb_checkpoints/requirements-checkpoint.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
apache-beam>=2.10<=2.12.0
psycopg2-binary==2.8.2
pymysql[rsa]==0.9.3 # for cryptography passwords
requests>=2.22.0
pyodbc
30 changes: 17 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,32 @@
This package aim to provide Apache_beam io connector for MySQL and Postgres database.


This package provides apache beam io connector for postgres db and mysql db.
This package wil aim to be pure python implementation for both io connector
This package provides apache beam io connectors for postgres db, mssql db and mysql db.
This package is a python implementation for those 3 io connectors

FYI: This does not uses any jdbc or odbc connector
FYI: it uses a pyodbc connector for the mssql implementation, but not for the other two connectors

Requirements:

1. Python>=2.7 or python>= 3.5
2. Apache beam >= 2.10
3. pymysql[rsa]
4. psycopg2-binary
5. pyodbc


Installation:

1. pip install git+git@github.com:MediaAgility/pysql-beam.git
or
2. pip installl pysql-beam
pip install git+git@github.com:jac2130/pysql-beam.git
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might be git@github.com:yesdeepakverma/pysql-beam.git




Current functionality:

1. Read from MySQL database by passing either table name or sql query
2. Read from Postgres database by passing either table name or sql query

3. Read from MSSQL database by passing either table name or squl query
4. Write to BigQuery

Reference Guide:

Expand All @@ -44,21 +45,24 @@ Usage Guide:
from pysql_beam.sql_io.sql import ReadFromSQL

....
ReadFromSQL(host=self.options.host, port=self.options.port,
username=self.options.username, password=self.options.password,
databse=self.options.database,
query=self.options.source_query,
ReadFromSQL(host=options.host, port=options.port,
username=options.username, password=options.password,
databse=options.database,
query=options.source_query,
wrapper=PostgresWrapper,
batch=100000)

```
Examples:

For mysql:
`python cloud_sql_to_file.py --host localhost --port 3306 --database SECRET_DATABASE --username SECRET_USER --password SECRET_PASSWORD --table YOUR_TABLE --output YOUR_OUTPUT_FLLE`
`python cloud_sql_to_bigquery.py --host localhost --port 3306 --database SECRET_DATABASE --username SECRET_USER --password SECRET_PASSWORD --table YOUR_TABLE --output_table 'MyProject:MyDataset.MyTable' --temp_location "gs://MyBucket/tmp"`

For postgres:
`python cloud_sql_to_file.py --host localhost --port 5432 --database SECRET_DATABASE --username SECRET_USER --password SECRET_PASSWORD --table YOUR_TABLE --output YOUR_OUTPUT_FLLE`
`python cloud_sql_to_bigquery.py --host localhost --port 5432 --database SECRET_DATABASE --username SECRET_USER --password SECRET_PASSWORD --table YOUR_TABLE --output_table 'MyProject:MyDataset.MyTable' --temp_location "gs://MyBucket/tmp"`

For mssql:
`python cloud_sql_to_bigquery.py --host localhost --port 1433 --database SECRET_DATABASE --username SECRET_USER --password SECRET_PASSWORD --query 'SELECT * from MyTable' --output_table 'MyProject:MyDataset.MyTable' --temp_location "gs://MyBucket/tmp"`


contribution:
Expand Down
Binary file added pysql_beam/__pycache__/__init__.cpython-37.pyc
Binary file not shown.
109 changes: 109 additions & 0 deletions pysql_beam/examples/.ipynb_checkpoints/cloud_sql_to_file-checkpoint.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
"""
This read some data from cloud sql mysql database
and write to file
Command to run this script:
python cloud_sql_to_file.py --host localhost --port 3306 --database SECRET_DATABASE \
--username SECRET_USER --password SECRET_PASSWORD --table YOUR_TABLE --output YOUR_OUTPUT_FLLE
For postgres sql:
python cloud_sql_to_file.py --host localhost --port 5432 --database SECRET_DATABASE \
--username SECRET_USER --password SECRET_PASSWORD --table YOUR_TABLE --output YOUR_OUTPUT_FLLE
"""
import sys
import json
sys.path.insert(0, '/home/jupyter/shapiro-johannes/pysql-beam/pysql-beam')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need these 2 lines ?

sys.path.insert(0, '/home/jupyter/shapiro-johannes/pysql-beam')
import logging
import apache_beam as beam
from pysql_beam.sql_io.sql import SQLSource, SQLWriter, ReadFromSQL
from pysql_beam.sql_io.wrapper import MySQLWrapper, MSSQLWrapper, PostgresWrapper
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions, GoogleCloudOptions
from apache_beam.io.gcp.internal.clients import bigquery
from apache_beam.io.gcp.bigquery import WriteToBigQuery, BigQueryDisposition

def log(row, level="debug"):
getattr(logging, level.lower())(row)
return row


class SQLOptions(PipelineOptions):

@classmethod
def _add_argparse_args(cls, parser):
# parser.add_value_provider_argument('--host', dest='host', required=False)
# parser.add_value_provider_argument('--port', dest='port', required=False)
# parser.add_value_provider_argument('--database', dest='database', required=False)
# parser.add_value_provider_argument('--table', dest='table', required=False)
# parser.add_value_provider_argument('--query', dest='query', required=False)
# parser.add_value_provider_argument('--username', dest='username', required=False)
# parser.add_value_provider_argument('--password', dest='password', required=False)
#parser.add_value_provider_argument('--db_type', dest='db_type', default="mssql", required=False, help="the type of database; allowed are 'mssql', 'mysql' and 'postgres'")
Comment on lines +37 to +44
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove these if not required ?

parser.add_value_provider_argument('--host', dest='host', default="localhost")
parser.add_value_provider_argument('--port', dest='port', default="3306")
parser.add_value_provider_argument('--database', dest='database', default="dverma")
parser.add_value_provider_argument('--query', dest='query', default="SELECT * FROM dverma.userPointsLedger;")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be remove dverma database value from here and use something which is not user specific!

parser.add_value_provider_argument('--username', dest='username', default="dverma")
parser.add_value_provider_argument('--password', dest='password', default="Deepak@123")
#parser.add_value_provider_argument('--output', dest='output', default="abc", help="output file name")
parser.add_argument('--output_table', dest='output_table', required=True,
help=('Output BigQuery table for results specified as: PROJECT:DATASET.TABLE '
'or DATASET.TABLE.'))



def parse_json(line):
'''Converts line from PubSub back to dictionary
'''
record = json.loads(line)
return record

def run():
pipeline_options = PipelineOptions()
options = pipeline_options.view_as(SQLOptions)
#options.view_as(SetupOptions).save_main_session = True
#temp_location = options.view_as(GoogleCloudOptions).temp_location
#print("Here!", temp_location)
pipeline = beam.Pipeline(options=options)



mysql_data = (pipeline | ReadFromSQL(host=options.host, port=options.port,
username=options.username, password=options.password,
database=options.database, query=options.query,
#wrapper={'mssql': MSSQLWrapper, 'mysql': MySQLWrapper, 'postgres': PostgresWrapper}[options.db_type],
wrapper=MSSQLWrapper,
# wrapper=PostgresWrapper
#
)
#| 'Parse' >> beam.Map(parse_json)
| 'Write to Table' >> WriteToBigQuery(
table= options.output_table,
schema = 'SCHEMA_AUTODETECT',
write_disposition=BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
)

#transformed_data = mysql_data | "Transform records" >> beam.Map(transform_records, 'user_id')
# transformed_data | "insert into mysql" >> SQLWriter(options.host, options.port,
# options.username, options.password,
# options.database,
# table='output',
# wrapper=MySQLWrapper, autocommit=True, batch_size=500)
# transformed_data | "insert into postgres" >> SQLWriter(options.host, 5432,
# 'postgres', options.password,
# options.database, table=options.output_table,
# wrapper=PostgresWrapper, autocommit=False, batch_size=500)
#mysql_data | "Log records " >> beam.Map(log) | beam.io.WriteToText(options.output, num_shards=1, file_name_suffix=".json")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove any unnecessary code




pipeline.run().wait_until_finish()


if __name__ == "__main__":
logging.getLogger().setLevel(logging.WARNING)
run()
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
SELECT DATEFROMPARTS(YEAR(MRRECVH.RECV_DATE),MONTH(MRRECVH.RECV_DATE), DAY(MRRECVH.RECV_DATE)) as 'Date', MRRECVH.RECV_DATE, MRRECVD.CONTROL, SUBSTRING(MRRECVD.CONTROL, 9, 2) as 'ADJ',
MRRECVD.LINE_TYPE,
MRRECVD.SEQUENCE,
MRRECVD.ACCOUNT,
MRRECVD.COMMODITY, MRRECVD.YARD, MRRECVD.DESCRIPTION,
MRRECVD.SETL_NET, MRRECVD.SETL_PRICE, MRRECVD.SETL_UM, MRRECVD.SETL_AMOUNT,
PPPrice.EFFECTIVE_DATE, PPPrice.EXPIRE_DATE, PPPrice.DESCRIPTION,
PPPrice.MARKET, PPPrice.MARKET_COMM, PPPrice.MARKET_BASE, PPPrice.DIFF_AMOUNT1, PPPrice.DIFF_PERCENT, PPPrice.DIFF_AMOUNT2, PPPrice.PRICE, PPPrice.PRICE_UM, PPPrice.COMMENT,
PPMKTD.EFFECTIVE_DATE, PPMKTD.LOW_PRICE, PPMKTD.HIGH_PRICE, PPMKTD.PRICE_UM, PPMKTD.DESCRIPTION
FROM MRRECVD INNER JOIN MRRECVH
on
MRRECVD.COMPANY = MRRECVH.COMPANY and
MRRECVD.DIVISION = MRRECVH.DIVISION and
MRRECVD.CONTROL = MRRECVH.CONTROL
left outer join PPPrice
on
MRRECVD.COMPANY = PPPrice.COMPANY and
MRRECVD.DIVISION = PPPrice.DIVISION and
MRRECVD.ACCOUNT = PPPrice.ACCOUNT and
MRRECVD.COMMODITY = PPPrice.COMMODITY and
MRRECVD.YARD = PPPrice.YARD and
MRRECVH.RECV_DATE between PPPrice.EFFECTIVE_DATE AND COALESCE(PPPrice.EXPIRE_DATE, '9999-12-31')
left outer join PPMKTD
on
MRRECVD.COMPANY = PPMKTD.COMPANY and
MRRECVD.DIVISION = PPMKTD.DIVISION and
PPPrice.MARKET_COMM = PPMKTD.COMMODITY and
PPPrice.MARKET = PPMKTD.MARKET and
DATEFROMPARTS(YEAR(MRRECVH.RECV_DATE),MONTH(MRRECVH.RECV_DATE),1) = PPMKTD.EFFECTIVE_DATE
where
MRRECVH.RECV_DATE > '2019-01-01'
order by CONTROL, LINE_TYPE, SEQUENCE

Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"cells": [],
"metadata": {},
"nbformat": 4,
"nbformat_minor": 4
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
SELECT COMPANY, DIVISION, MARKET, COMMODITY, EFFECTIVE_DATE, ISSUE_DATE, DESCRIPTION, LOW_PRICE, HIGH_PRICE, PRICE_UM, CREATEDATE, CREATETIME, CREATEUSER, CREATESTATION, LASTDATE, LASTTIME, LASTUSER, LASTSTATION
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this file? This might be something specific to your personal or professional work.

FROM SSC.dbo.PPMKTD
where EFFECTIVE_DATE >= '2019-01-01';
68 changes: 68 additions & 0 deletions pysql_beam/examples/MY_QUERY
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
SELECT DATEFROMPARTS(YEAR([MRRECVH].RECV_DATE),MONTH([MRRECVH].RECV_DATE), DAY([MRRECVH].RECV_DATE)) as 'Date', [MRRECVH].RECV_DATE, [MRRECVD].CONTROL, SUBSTRING([MRRECVD].CONTROL, 9, 2) as 'ADJ',
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need this file ?

[MRRECVD].LINE_TYPE,
[MRRECVD].[SEQUENCE],
[MRRECVD].ACCOUNT,
[MRRECVD].COMMODITY, [MRRECVD].YARD, [MRRECVD].DESCRIPTION,
[MRRECVD].SETL_NET, [MRRECVD].SETL_PRICE, [MRRECVD].SETL_UM, [MRRECVD].SETL_AMOUNT,
[PPPrice].EFFECTIVE_DATE, [PPPrice].EXPIRE_DATE, [PPPrice].DESCRIPTION,
[PPPrice].MARKET, [PPPrice].MARKET_COMM, [PPPrice].MARKET_BASE, [PPPrice].DIFF_AMOUNT1, [PPPrice].DIFF_PERCENT, [PPPrice].DIFF_AMOUNT2, [PPPrice].PRICE, [PPPrice].PRICE_UM, [PPPrice].COMMENT,
'' as "PPMKTD",
[PPMKTD].EFFECTIVE_DATE, [PPMKTD].LOW_PRICE, [PPMKTD].HIGH_PRICE, [PPMKTD].PRICE_UM, [PPMKTD].DESCRIPTION
FROM shapiro.dbo.MRRECVD INNER JOIN shapiro.dbo.MRRECVH
on
[MRRECVD].COMPANY = [MRRECVH].COMPANY and
[MRRECVD].DIVISION = [MRRECVH].DIVISION and
[MRRECVD].CONTROL = [MRRECVH].CONTROL
left outer join shapiro.dbo.PPPrice
on
[MRRECVD].COMPANY = [PPPrice].COMPANY and
[MRRECVD].DIVISION = [PPPrice].DIVISION and
[MRRECVD].ACCOUNT = [PPPrice].ACCOUNT and
[MRRECVD].COMMODITY = [PPPrice].COMMODITY and
[MRRECVD].YARD = [PPPrice].YARD and
[MRRECVH].RECV_DATE between [PPPrice].EFFECTIVE_DATE AND COALESCE([PPPrice].EXPIRE_DATE, '9999-12-31')
left outer join shapiro.dbo.PPMKTD
on
[MRRECVD].COMPANY = [PPMKTD].COMPANY and
[MRRECVD].DIVISION = [PPMKTD].DIVISION and
[PPPrice].MARKET_COMM = [PPMKTD].COMMODITY and
[PPPrice].MARKET = [PPMKTD].MARKET and
DATEFROMPARTS(YEAR([MRRECVH].RECV_DATE),MONTH([MRRECVH].RECV_DATE),1) = [PPMKTD].EFFECTIVE_DATE
where
[MRRECVH].RECV_DATE > '2019-01-01'
order by CONTROL, LINE_TYPE, [SEQUENCE]

SELECT DATEFROMPARTS(YEAR([MRRECVH].RECV_DATE),MONTH([MRRECVH].RECV_DATE), DAY([MRRECVH].RECV_DATE)) as 'Date', [MRRECVH].RECV_DATE, [MRRECVD].CONTROL, SUBSTRING([MRRECVD].CONTROL, 9, 2) as 'ADJ',
[MRRECVD].LINE_TYPE,
[MRRECVD].[SEQUENCE],
[MRRECVD].ACCOUNT,
[MRRECVD].COMMODITY, [MRRECVD].YARD, [MRRECVD].DESCRIPTION,
[MRRECVD].SETL_NET, [MRRECVD].SETL_PRICE, [MRRECVD].SETL_UM, [MRRECVD].SETL_AMOUNT,
[PPPrice].EFFECTIVE_DATE, [PPPrice].EXPIRE_DATE, [PPPrice].DESCRIPTION,
[PPPrice].MARKET, [PPPrice].MARKET_COMM, [PPPrice].MARKET_BASE, [PPPrice].DIFF_AMOUNT1, [PPPrice].DIFF_PERCENT, [PPPrice].DIFF_AMOUNT2, [PPPrice].PRICE, [PPPrice].PRICE_UM, [PPPrice].COMMENT,
'' as "PPMKTD",
[PPMKTD].EFFECTIVE_DATE, [PPMKTD].LOW_PRICE, [PPMKTD].HIGH_PRICE, [PPMKTD].PRICE_UM, [PPMKTD].DESCRIPTION
FROM shapiro.dbo.MRRECVD INNER JOIN shapiro.dbo.MRRECVH
on
[MRRECVD].COMPANY = [MRRECVH].COMPANY and
[MRRECVD].DIVISION = [MRRECVH].DIVISION and
[MRRECVD].CONTROL = [MRRECVH].CONTROL
left outer join shapiro.dbo.PPPrice
on
[MRRECVD].COMPANY = [PPPrice].COMPANY and
[MRRECVD].DIVISION = [PPPrice].DIVISION and
[MRRECVD].ACCOUNT = [PPPrice].ACCOUNT and
[MRRECVD].COMMODITY = [PPPrice].COMMODITY and
[MRRECVD].YARD = [PPPrice].YARD and
[MRRECVH].RECV_DATE between [PPPrice].EFFECTIVE_DATE AND COALESCE([PPPrice].EXPIRE_DATE, '9999-12-31')
left outer join shapiro.dbo.PPMKTD
on
[MRRECVD].COMPANY = [PPMKTD].COMPANY and
[MRRECVD].DIVISION = [PPMKTD].DIVISION and
[PPPrice].MARKET_COMM = [PPMKTD].COMMODITY and
[PPPrice].MARKET = [PPMKTD].MARKET and
DATEFROMPARTS(YEAR([MRRECVH].RECV_DATE),MONTH([MRRECVH].RECV_DATE),1) = [PPMKTD].EFFECTIVE_DATE
where
[MRRECVH].RECV_DATE > '2019-01-01'
order by CONTROL, LINE_TYPE, [SEQUENCE]

Binary file not shown.
Loading