In [2]:
import google.cloud.dataflow as df

In [3]:
p = df.Pipeline('DirectPipelineRunner')

In [4]:
p | df.Create(['hello', 'world']) | df.io.Write(df.io.TextFileSink('./test.txt'))

<PValue transform=<_NativeWrite(PTransform) label=[native_write]> at 0x1061cd850>

In [5]:
p.run()

<google.cloud.dataflow.runners.direct_runner.DirectPipelineResult at 0x106188e90>

In [6]:
!more ./test.txt

hello=
world
[K[?1l>.txt (END)[m[K

In [7]:
input_file = './datain'
output_file = './dataout'

In [8]:
def generate_data(outfile, size):
    import json
    import time
    product = {'milk':20, 'soft_drink_300ml':35, 'soft_drink_500ml':45, 'soft_drink_1l':60, 'beer':80, 'snacks':50, 'sweets':100, 'candy':30,'rice':60, 'dhal':70,'shampoo':96, 'soap':20,'face_wash':120,'chicken':200,'mutton':600, 'shirt':400, 'pant':600, 'shorts':250, 'salvar':700, 'shoe':300,'flipflop':100}
    with open(outfile, 'wt') as f:  
        for _ in xrange(size):
            for num, ix in enumerate(product):
                f.write('%s\n' % json.dumps(
                        {'ProductID': 1 + num, 'ProductName': ix, 
                         'Price': product[ix], 'Timestamp': time.time()}))

generate_data(input_file, 100)
!head -3 $input_file

{"Timestamp": 1528437211.152227, "Price": 60, "ProductName": "soft_drink_1l", "ProductID": 1}
{"Timestamp": 1528437211.152303, "Price": 250, "ProductName": "shorts", "ProductID": 2}
{"Timestamp": 1528437211.152328, "Price": 120, "ProductName": "face_wash", "ProductID": 3}


In [9]:
import google.cloud.dataflow as df

In [10]:
p = df.Pipeline('DirectPipelineRunner')

In [11]:
(p 
   |df.io.Read(df.io.TextFileSource(input_file))
   | df.io.Write(df.io.TextFileSink(output_file)))

<PValue transform=<_NativeWrite(PTransform) label=[native_write]> at 0x106188c50>

In [12]:
p.run()

<google.cloud.dataflow.runners.direct_runner.DirectPipelineResult at 0x106638b90>

In [13]:
!head -3 $output_file

{"Timestamp": 1528437211.152227, "Price": 60, "ProductName": "soft_drink_1l", "ProductID": 1}
{"Timestamp": 1528437211.152303, "Price": 250, "ProductName": "shorts", "ProductID": 2}
{"Timestamp": 1528437211.152328, "Price": 120, "ProductName": "face_wash", "ProductID": 3}


In [14]:
graph = (
    df.io.Read(df.io.TextFileSource(input_file))
    | df.io.Write(df.io.TextFileSink(output_file)))

In [15]:
p = df.Pipeline('DirectPipelineRunner')
p | graph
p.run()
!head -3 $output_file

{"Timestamp": 1528437211.152227, "Price": 60, "ProductName": "soft_drink_1l", "ProductID": 1}
{"Timestamp": 1528437211.152303, "Price": 250, "ProductName": "shorts", "ProductID": 2}
{"Timestamp": 1528437211.152328, "Price": 120, "ProductName": "face_wash", "ProductID": 3}


In [16]:
from IPython.core.magic import register_cell_magic
@register_cell_magic
def dataflow_run(line, cell):
    p = df.Pipeline('DirectPipelineRunner')
    p | eval(cell)
    p.run()
    !head -3 $output_file

In [17]:
%%dataflow_run
(df.io.Read(df.io.TextFileSource(input_file))
 | df.io.Write(df.io.TextFileSink(output_file)))

{"Timestamp": 1528437211.152227, "Price": 60, "ProductName": "soft_drink_1l", "ProductID": 1}
{"Timestamp": 1528437211.152303, "Price": 250, "ProductName": "shorts", "ProductID": 2}
{"Timestamp": 1528437211.152328, "Price": 120, "ProductName": "face_wash", "ProductID": 3}


In [18]:
def parse_record(e):
    import json
    r = json.loads(e)
    return r['ProductID'], r['Price']

In [19]:
%%dataflow_run
(df.io.Read(df.io.TextFileSource(input_file))
 | df.Map(parse_record)
 | df.io.Write(df.io.TextFileSink(output_file)))

(1, 60)
(2, 250)
(3, 120)


In [20]:
def parse_record(e):
    import json
    r = json.loads(e)
    yield r['ProductID'], r['Price']

In [21]:
%%dataflow_run
(df.io.Read(df.io.TextFileSource(input_file))
 | df.FlatMap(parse_record)
 | df.io.Write(df.io.TextFileSink(output_file)))

(1, 60)
(2, 250)
(3, 120)


In [22]:
class ParseRecordDoFn(df.DoFn):
    def process(self, context):
        import json
        r = json.loads(context.element)
        yield r['ProductID'], r['Price']

In [23]:
%%dataflow_run
(df.io.Read(df.io.TextFileSource(input_file))
 | df.ParDo(ParseRecordDoFn())
 | df.io.Write(df.io.TextFileSink(output_file)))

(1, 60)
(2, 250)
(3, 120)


In [25]:
%%dataflow_run
(df.io.Read(df.io.TextFileSource(input_file))
 | df.ParDo(ParseRecordDoFn())
 | df.GroupByKey()
 | df.io.Write(df.io.TextFileSink(output_file)))

(7, [50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50])
(12, [30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30])
(8, [20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 20, 

In [25]:
filtered = [3, 4, 5, 6, 7, 8, 9, 10]

def parse_record(e, filtered):
    import json
    r = json.loads(e)
    if int(r['ProductID']) not in filtered:
        yield r['ProductID'], r['Price']

In [26]:
%%dataflow_run
(df.io.Read(df.io.TextFileSource(input_file))
 | df.FlatMap(parse_record, filtered)
 | df.CombinePerKey(sum)
 | df.io.Write(df.io.TextFileSink(output_file)))

(2, 100000)
(1, 100000)


In [27]:
from google.cloud.dataflow import pvalue

In [28]:
p = df.Pipeline('DirectPipelineRunner')
filtered_pcoll = p | df.Create(filtered)
(p 
 | df.io.Read(df.io.TextFileSource(input_file))
 | df.FlatMap(parse_record, pvalue.AsIter(filtered_pcoll))
 | df.CombinePerKey(sum)
 | df.io.Write(df.io.TextFileSink(output_file)))

p.run()
!head -3 $output_file

(2, 100000)
(1, 100000)


In [29]:
filtered_pcoll

<PCollection transform=<Create(PTransform) label=[Create]> at 0x10cde3ad0>

In [30]:
p = df.Pipeline('DirectPipelineRunner')
pc1 = p | df.io.Read('Read once', df.io.TextFileSource(input_file)) 
pc2 = p | df.io.Read('Read twice', df.io.TextFileSource(input_file)) 

((pc1, pc2) 
 | df.Flatten()
 | df.FlatMap(parse_record, filtered=[])
 | df.CombinePerKey(sum)
 | df.io.Write(df.io.TextFileSink(output_file)))

p.run()
!head -3 $output_file

(7, 200000)
(8, 200000)
(5, 200000)


In [31]:
output_table = 'silviuc-dataflow:demo.silviuc_demo'

In [32]:
p = df.Pipeline('DirectPipelineRunner')
(p 
 | df.io.Read(df.io.TextFileSource(input_file))
 | df.FlatMap(parse_record, pvalue.AsIter(filtered_pcoll))
 | df.CombinePerKey(sum)
 | df.Map(lambda (pr, v): {'ProductID': pr, 'Value': v}) 
 | df.io.Write(df.io.BigQuerySink(
            output_table,
            schema='ProductID:INTEGER, Value:FLOAT', 
            create_disposition=df.io.BigQueryDisposition.CREATE_IF_NEEDED,
            write_disposition=df.io.BigQueryDisposition.WRITE_TRUNCATE))) 

p.run()
print 'Resulting BigQuery table:'
print 'https://bigquery.cloud.google.com/table/%s?pli=1' % output_table

ERROR:root:The gcloud tool was not found.
Traceback (most recent call last):
  File "/anaconda3/envs/python2/lib/python2.7/site-packages/google/cloud/dataflow/internal/auth.py", line 104, in _refresh
    ['gcloud', 'auth', 'print-access-token'], stdout=processes.PIPE)
  File "/anaconda3/envs/python2/lib/python2.7/site-packages/google/cloud/dataflow/utils/processes.py", line 49, in Popen
    return subprocess.Popen(*args, **kwargs)
  File "/anaconda3/envs/python2/lib/python2.7/subprocess.py", line 394, in __init__
    errread, errwrite)
  File "/anaconda3/envs/python2/lib/python2.7/subprocess.py", line 1047, in _execute_child
    raise child_exception
OSError: [Errno 2] No such file or directory
ERROR:root:Error while visiting Write/native_write


AuthenticationException: The gcloud tool was not found: [Errno 2] No such file or directory