Skip to content
This repository has been archived by the owner on Nov 8, 2021. It is now read-only.

Commit

Permalink
Improve CSV checking and encoding support, improve documentation (#17)
Browse files Browse the repository at this point in the history
* Improve CSV checking and encoding support, improve documentation

* fix isort

* Add chardet for encoding detection and add lots of logging

* Fix linting errors
  • Loading branch information
villebro committed Nov 6, 2019
1 parent 8e56ed0 commit cb7b674
Show file tree
Hide file tree
Showing 12 changed files with 106 additions and 34 deletions.
26 changes: 13 additions & 13 deletions example/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,16 +76,16 @@ As part of the task data quality issues are logged in a separate table
echo "select * from fact_customer_dq;" | sqlite3 --column --header target.db
```

|report_date|customer_id|rowid|source|priority|category|column_name|message|
| --- | --- | --- | --- | --- | --- | --- | --- |
2019-06-30|Terminator|2019-11-06 05:59:52.380884|source|high|incorrect|blood_group|Invalid blood group: Liquid Metal
2019-06-30|Terminator|2019-11-06 05:59:52.380825|transform|medium|missing|age|Age is undefined due to undefined birthdate
2019-06-30|Terminator|2019-11-06 05:59:52.380767|source|high|incorrect|birthdate|Birthdate in future: 2095-01-01
2019-06-30|Peter Impossible|2019-11-06 05:59:52.380575|source|high|incorrect|blood_group|Invalid blood group: X+
2019-06-30|Peter Impossible|2019-11-06 05:59:52.380516|transform|medium|missing|age|Age is undefined due to undefined birthdate
2019-06-30|Peter Impossible|2019-11-06 05:59:52.380459|source|high|incorrect|birthdate|Cannot parse birthdate: 1980-13-01
2019-06-30|Mary Null|2019-11-06 05:59:52.380341|source|medium|missing|blood_group|Blood group undefined in customer blood group table
2019-06-30|Mary Null|2019-11-06 05:59:52.380280|transform|medium|missing|age|Age is undefined due to undefined birthdate
2019-06-30|Mary Null|2019-11-06 05:59:52.380219|source|medium|missing|birthdate|Missing birthdate
2019-06-30|John Connor|2019-11-06 05:59:52.378454|transform|medium|missing|age|Age is undefined due to undefined birthdate
2019-06-30|John Connor|2019-11-06 05:59:52.378385|source|high|incorrect|birthdate|Birthdate in future: 2080-01-01
|report_date|customer_id|source|priority|category|column_name|message|
| --- | --- | --- | --- | --- | --- | --- |
2019-06-30|Terminator|source|high|incorrect|blood_group|Invalid blood group: Liquid Metal
2019-06-30|Terminator|transform|medium|missing|age|Age is undefined due to undefined birthdate
2019-06-30|Terminator|source|high|incorrect|birthdate|Birthdate in future: 2095-01-01
2019-06-30|Peter Impossible|source|high|incorrect|blood_group|Invalid blood group: X+
2019-06-30|Peter Impossible|transform|medium|missing|age|Age is undefined due to undefined birthdate
2019-06-30|Peter Impossible|source|high|incorrect|birthdate|Cannot parse birthdate: 1980-13-01
2019-06-30|Mary Null|source|medium|missing|blood_group|Blood group undefined in customer blood group table
2019-06-30|Mary Null|transform|medium|missing|age|Age is undefined due to undefined birthdate
2019-06-30|Mary Null|source|medium|missing|birthdate|Missing birthdate
2019-06-30|John Connor|transform|medium|missing|age|Age is undefined due to undefined birthdate
2019-06-30|John Connor|source|high|incorrect|birthdate|Birthdate in future: 2080-01-01
4 changes: 4 additions & 0 deletions example/run_example.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import logging
from datetime import date

from tasks.fact_customer_task import FactCustomerTask
from tasks.init_source_task import InitSourceTask

if __name__ == "__main__":
logger = logging.getLogger("sqltask")
logger.setLevel(logging.DEBUG)

# create initial data used by main task
task = InitSourceTask()
task.execute()
Expand Down
5 changes: 2 additions & 3 deletions example/tasks/fact_customer_task.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
# flake8: noqa: E501
from datetime import date, datetime
import os
from datetime import date, datetime
from typing import cast

from sqlalchemy.schema import Column
from sqlalchemy.types import Date, DateTime, Integer, String

from sqltask.base import dq
from sqltask.base.exceptions import TooFewRowsException
from sqltask.base.table import DqTableContext, DqOutputRow
from sqltask.base.table import DqOutputRow, DqTableContext
from sqltask.sources.csv import CsvLookupSource
from sqltask.sources.sql import SqlLookupSource, SqlRowSource

Expand All @@ -34,7 +34,6 @@ def __init__(self, report_date: date):
comment="The customer table",
timestamp_column_name="etl_timestamp",
batch_params={"report_date": report_date},
dq_info_column_names=["etl_timestamp"],
))

# Define the main query used to populate the target table
Expand Down
2 changes: 1 addition & 1 deletion requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
alabaster==0.7.12 # via sphinx
babel==2.7.0 # via sphinx
certifi==2019.9.11 # via requests
chardet==3.0.4 # via requests
chardet==3.0.4
click==7.0 # via pip-tools
codecov==2.0.15
coverage==4.5.4 # via codecov
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@
#
# pip-compile --output-file=requirements.txt setup.py
#
chardet==3.0.4
sqlalchemy==1.3.10
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
packages=find_packages(),
install_requires=[
'sqlalchemy',
'chardet',
],
license='MIT',
classifiers=[
Expand Down
2 changes: 1 addition & 1 deletion sqltask/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from sqltask.base.row_source import BaseRowSource
from sqltask.base.table import BaseOutputRow, BaseTableContext

__version__ = '0.4.4'
__version__ = '0.4.5'

# initialize logging
logger = logging.getLogger(__name__)
Expand Down
2 changes: 1 addition & 1 deletion sqltask/base/lookup_source.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from typing import Any, cast, Dict, Optional, Sequence, Tuple
from typing import Any, Dict, Optional, Sequence, Tuple, cast

from sqltask.base.row_source import BaseRowSource

Expand Down
7 changes: 7 additions & 0 deletions sqltask/base/row_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@


class BaseRowSource:
"""
Base class for data sources that return iterable rows. A row from a BaseRowSource
can be any Mapping from a key (=column name) to a value (=cell value) that can
be referenced as follows:
>>> for row in rows:
>>> column_value = row["column_name"]
"""
def __init__(self, name: Optional[str] = None):
self.name = name

Expand Down
55 changes: 43 additions & 12 deletions sqltask/sources/csv.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
import csv
from typing import List, Optional, Sequence
import logging
from typing import Optional, Sequence

from sqltask.base.lookup_source import BaseLookupSource
from sqltask.base.row_source import BaseRowSource
from sqltask.utils.file import detect_encode

logger = logging.getLogger(__name__)


class CsvRowSource(BaseRowSource):
"""
Row source that reads from a CSV file.
Row source that reads from a CSV file. Expects the first row to contain column
names, with subsequent rows containing column values in the same order.
"""

def __init__(self,
Expand All @@ -25,24 +30,50 @@ def __init__(self,
super().__init__(name)
self.file_path = file_path
self.delimiter = delimiter

if encoding is None:
logger.debug(
f"Autodetecting encoding for CSV row source: "
f"{name or file_path or '<undefined>'}"
)
result = detect_encode(file_path)
encoding = result["encoding"]
logger.debug(f"Detected file encoding: {encoding}")
self.encoding = encoding

# populate column names
self.columns: List[str] = []
with open(self.file_path, newline="", encoding=encoding) as csvfile:
csvreader = csv.reader(csvfile, delimiter=self.delimiter)
row = next(csvreader)
for column in row:
self.columns.append(column)
def __repr__(self):
return self.name or self.file_path or '<undefined>'

def __iter__(self):
"""
Iterate over
:return:
"""
columns = []
row_number = 0
logger.debug(
f"Start reading CSV row source: {self}")
with open(self.file_path, newline="") as csvfile:
csvreader = csv.reader(csvfile, delimiter=self.delimiter)
# skip header row
next(csvreader)
for in_row in csvreader:
row_dict = {self.columns[i]: col for i, col in enumerate(in_row)}
row_number += 1

# read column names on first row
if row_number == 1:
for column in in_row:
columns.append(column)
continue

if len(in_row) != len(columns):
raise Exception(
f"Error reading row {row_number} of CSV file {self}: "
f"Expected {len(columns)} columns, found {len(in_row)}")
row_dict = {columns[i]: col for i, col in enumerate(in_row)}
yield row_dict
logger.info(
f"Finished reading {row_number - 1} rows for CSV row source: {self}"
)


class CsvLookupSource(BaseLookupSource):
Expand Down
13 changes: 10 additions & 3 deletions sqltask/sources/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,18 @@ def __init__(
database=self.database, schema=self.schema
)

def __repr__(self):
return self.name or "<undefined>"

def __iter__(self) -> Iterator[RowProxy]:
logger.info(f"Executing SQL query for sql row source "
f"{self.name or '<undefined>'}")
logger.debug(f"Executing query for SQL row source: {self}")
rows = self.engine_context.engine.execute(text(self.sql), self.params)
for row in rows:
row_number = 0
for row_number, row in enumerate(rows):
yield row
logger.debug(
f"Finished reading {row_number + 1} rows from SQL row source: {self}"
)


class SqlLookupSource(BaseLookupSource):
Expand All @@ -71,6 +77,7 @@ def __init__(self,
schema: Optional[str] = None,
):
row_source = SqlRowSource(
name=name,
sql=sql,
params=params,
engine_context=engine_context,
Expand Down
22 changes: 22 additions & 0 deletions sqltask/utils/file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from typing import Any, Dict

from chardet.universaldetector import UniversalDetector


def detect_encode(file: str) -> Dict[str, Any]:
"""
Detect file encoding using chardet UniversalDetector
:param file: path to file to detect
:return: result from detector
"""
detector = UniversalDetector()
detector.reset()
with open(file, 'rb') as f:
for row in f:
detector.feed(row)
if detector.done:
break

detector.close()
return detector.result

0 comments on commit cb7b674

Please sign in to comment.