In [16]:
def compute_fit(row):
    from scipy import stats
    result = {}
    result['station_id'] = row['start_station_id']
    durations = row['duration_array']
    ag, bg, cg = stats.gamma.fit(durations) 
    result['ag'] = ag
    result['bg'] = bg
    result['cg'] = cg
    return result

In [None]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

opts = beam.pipeline.PipelineOptions(
    flags=[],
    project='project',
    job_name='df-job',
    temp_location='gs://bucket/temp',
    region='europe-west1'
    )

RUNNER = 'DirectRunner'
query ="""
    #standardSQL
    SELECT start_station_id, ARRAY_AGG(duration) AS duration_array
    FROM `bigquery-public-data.london_bicycles.cycle_hire`
    GROUP BY start_station_id
"""
with beam.Pipeline(RUNNER, options = opts) as p:
    (p
        | 'read_bq' >> beam.io.Read(beam.io.ReadFromBigQuery(query=query, use_standard_sql=True))
        | 'compute_fit' >> beam.Map(compute_fit)
        | 'write_bq' >> beam.io.gcp.bigquery.WriteToBigQuery('ch04.station_stats', schema='station_id:string,ag:FLOAT64,bq:FLOAT64,cg:FLOAT64')
    )