In \[1\]:

    import apache_beam as beam
    from apache_beam.options import pipeline_options
    from apache_beam.options.pipeline_options import GoogleCloudOptions
    from apache_beam.runners import DataflowRunner
    import google.auth
    import json
    import re

In \[2\]:

    # Configuración de las opciones del pipeline Apache Beam.
    options = pipeline_options.PipelineOptions(flags=[])

    # Establece el proyecto como el proyecto por defecto en su actual entorno de Google Cloud.
    _, options.view_as(GoogleCloudOptions).project = google.auth.default()

    # Región de Google Cloud en la cual va a correr Dataflow.
    options.view_as(GoogleCloudOptions).region = 'us-east1'

    # Debido a que este notebook viene con una versión construida localmente del SDK
    # de Beam Python, se necesita establecer la opción sdk_location para el Dataflow Runner.
    options.view_as(pipeline_options.SetupOptions).sdk_location = (
        '/root/apache-beam-custom/packages/beam/sdks/python/dist/apache-beam-%s0.tar.gz' % 
        beam.version.__version__)

In \[3\]:

    # Carpeta de trabajo en el bucket jrodriguez-test de Google Cloud Storage.
    dataflow_gcs_location = 'gs://jrodriguez-test/dataflow'

In \[4\]:

    # Ubicación de carpetas temporales de Dataflow.
    options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location
    options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location

In \[5\]:

    # Clases que heredan de PTransform, que se usan en un pipeline más adelante junto
    # con otras Ptransform.

    # Lee archivo el archivo json y lo transforma en PCollection
    class ReadData(beam.PTransform):
        
        #Recibe la ruta
        def __init__(self, file_pattern):
            self._file_pattern = file_pattern
        
        # Ejecución. Recibe PCollection.
        def expand(self, pcoll):
            # Inicio de pipeline de ejecución.
            return (pcoll.pipeline
                    # Lectura del archivo plano hacia Beam.
                    | beam.io.ReadFromText(self._file_pattern)
                    # Cargar las filas como json y retorna PCollection.
                    | beam.Map(json.loads))

    # Filtra las filas que tienen valor rssi menor al valor que
    # recibe como parámetro (value). Continúa la ejecución con
    # las que tienen valor rssi mayor o igual.
    class FilterRssi(beam.PTransform):
        
        def __init__(self, value):
            self.value = value
        # Recibe la anterior PCollection en el pipeline
        def expand(self, pcoll):
            return (pcoll
                    | beam.Filter(lambda row: row['rssi'] >= self.value))
        
    # Deja sólo los id client y luego selecciona los distintos ids.
    class Distinct(beam.PTransform):

        def expand(self, pcoll):
            return (pcoll
                    | beam.Map(lambda x: x.get('client'))
                    | beam.transforms.util.Distinct())
        
    # Transforma cada fila del PCollection que recibe a la forma de
    # tupla (client, [lista de tiempos de cada visita])
    class Visits(beam.PTransform):
        
        # Recibe el umbral de visita
        def __init__(self, rssiVisit):
            self.rssiVisit = rssiVisit

        # Función aplicada a cada fila del PCollection como map
        # (en la función expand), que retorna la lista de tiempos
        # de cada visita de client.
        # Recibe una lista de tuplas de un client:
        # [(rssi, date), (rssi, date), ...]
        def getVisits(self, dataList):
            
            dataList = list(dataList)
            # Se ordena la lista por date
            dataList.sort(key=lambda rssi_date: rssi_date[1])
        
            # Lista de listas de visitas
            visitsList = []
            # Lista de tuplas
            visit = []
            # Se itera la lista de tuplas.
            # Para una iteración se compara con date de la anterior,
            # para eso se usa previousDate. Se inicia con el primer valor.
            previousDate = dataList[0][1]
            for rssi_date in dataList:
                # Si la diferencia del tiempo anterior con el actual
                # es menor de media hora. En la primera iteración es 0,
                # por tanto ingresa al if.
                if rssi_date[1]-previousDate < 1800000:
                    # Carga un registro de la visita como tupla (rssi, date)
                    visit.append(rssi_date)
                # Si es mayor o igual, ese registro hace parte de la siguiente
                # visita. En la lista de listas se agrega la visita anterior
                # y se empieza una nueva con el primer registro.
                else:
                    visitsList.append(visit)
                    visit = [rssi_date]
                previousDate = rssi_date[1]
            # Agrega la última visita.
            visitsList.append(visit)
            
            # Hace falta que que haya mínimo una medición mayor al umbral de visita
            # y que el tiempo entre esa medición y la última sea menor que 5 minutos,
            # para que se considere una visita como tal.
            
            # Lista de los tiempos de visitas reales.
            realVisits = []
            # Se itera la lista de listas anterior
            for visit in visitsList:
                visitSize = len(visit)
                # Se itera la lista de tuplas de registros correspondientes a una
                # visita, hasta el penúltimo registro, y se confirma que pase
                # el umbral de visita, ...
                for i in range(visitSize-1):
                    if visit[i][0] >= self.rssiVisit:
                        # ... de ser así toma el tiempo de diferencia entre dicha
                        # medida y la última, ...
                        visitTime = visit[visitSize-1][1] - visit[i][1]
                        # ... si es mayor que 5 minutos, agrega el tiempo.
                        if visitTime >= 300000:
                            realVisits.append(visitTime)
                        # Como ya encontró la medición requerida, quiebra el for.
                        break

            return realVisits

        # Recibe PCollection anterior en el pipeline y aplica transformación las filas.
        def expand(self, pcoll):
            return (pcoll
                    | beam.MapTuple(lambda client, dataList: (client, self.getVisits(dataList))))
        
    # Calcula el tiempo promedio de las visitas de cada client.
    class VisitorsTimeAvg(beam.PTransform):

        # Calcula el promedio de una lista de enteros, y el valor lo convierte a minutos.
        def mean(self, dataList):
            total = 0
            for data in dataList:
                total += data
            return int(total/(60000*len(dataList)))
        
        def expand(self, pcoll):
            return (pcoll
                | beam.MapTuple(lambda client, dataList: (client, self.mean(dataList))))

    # Recibe un PCollection con filas de la forma (client, número de visitas) y suma las visitas.
    class VisitsSum(beam.PTransform):

        def expand(self, pcoll):
            return (pcoll
                | beam.Map(lambda row: row[1])
                | beam.CombineGlobally(sum))

In \[6\]:

    # Creación de pipeline p con un DataflowRunner por defecto, y recibe las opciones
    # de configuración inicializadas al principio del notebook.
    p = beam.Pipeline(DataflowRunner(), options=options)

In \[7\]:

    # ----- PIPELINE -----

    # Lee el archivo json y filtra para quedarse con las filas que tengan rssi mayor a 10 dBm.
    transeuntes = (p | 'lectura' >> ReadData('gs://jrodriguez-test/FuzzyData.txt')
                     | 'filtroUmbralTranseuntes' >> FilterRssi(10))

    # Toma los id client distintos del PCollection transeuntes
    transeuntesMac = transeuntes | 'transeuntesDistintos' >> Distinct()

    # Cuenta el número de ids únicas de la salida anterior transeuntes.
    totalTranseuntes = transeuntesMac | 'conteoTranseuntes' >> beam.combiners.Count.Globally()

    # A partir de este punto ya registros de mediciones con umbral de transeúntes, ahora
    # queremos encontrar los visitantes.
    visitantesTiempoVisitas = (transeuntes
        # Primer filtro (de dos) para poder pertenecer a una visita.
        | 'filtroUmbralVisitantes' >> FilterRssi(15)
        # Transforma las filas del PCollection recibido a tuplas de la forma ( client, (rssi, date) )
        | 'seleccionVariables' >> beam.Map(lambda x: (x.get('client'),(x.get('rssi'), x.get('date'))))
        # Agrupa por id client único y por cada uno obtiene una lista de tuplas. Cada fila queda de la
        # forma ( client, [lista de tuplas (rssi, date)] )
        | 'agruparMac' >> beam.transforms.core.GroupByKey()
        # Calcula las visitas y convierte cada fila a la forma ( client, [lista de tiempos visitas])
        | 'obtenerVisitas' >> Visits(20)
        # Sólo continúan los que quedaron con elementos en la lista.
        | 'filtroVisitantes' >> beam.Filter(lambda row : len(row[1]) > 0))

    # Recibe la PCollection visitantesTiempoVisitas y calcula el tiempo promedio. Cada fila queda
    # transformada de la forma: (client, tiempo_promedio_visita)
    visitantesTiempoProm = (visitantesTiempoVisitas
        | 'promedio' >> VisitorsTimeAvg())

    # Recibe también la PCollection visitantesTiempoVisitas y hace el conteo de visitantes.
    totalVisitantes = (visitantesTiempoVisitas
        | 'conteoVisitantes' >> beam.combiners.Count.Globally())

    # Recibe también la PCollection visitantesTiempoVisitas y calcula el número de visitas de cada
    # client, mediante la longitud de la lista de tiempos. Cada fila queda de la forma:
    # (client, número_visitas)
    visitasCliente = (visitantesTiempoVisitas
        | 'conteoVisitas' >> beam.MapTuple(lambda client, dataList: (client, len(dataList))))

    # Suma todos los valores de la PCollection visitasCliente para obtener el total de visitas.
    totalVisitas = visitasCliente | 'sumaVisitas' >> VisitsSum()

In \[8\]:

    # Directorio para almacenar los archivos de salida del trabajo.
    output_gcs_location = '%s/output' % dataflow_gcs_location

    # Se incorporan las salidas del pipeline para escribir en Storage
    # en formato csv, mediante PTransforms de entrada/salida.

    (transeuntesMac
        | 'Write transeuntes to GCS' >> beam.io.WriteToText(
            # Nombre del archivo con toda la ruta.
            output_gcs_location + '/transeuntes',
            file_name_suffix='.csv'))

    (totalTranseuntes
        | 'Write totalTranseuntes to GCS' >> beam.io.WriteToText(
            output_gcs_location + '/totalTranseuntes',
            file_name_suffix='.csv'))

    (visitantesTiempoVisitas
        | 'Write visitantesTiempoVisita to GCS' >> beam.io.WriteToText(
            output_gcs_location + '/visitantesTiempoVisitas',
            file_name_suffix='.csv',
            header='mac,tiempos'))
        
    (visitantesTiempoProm
        | beam.Map(lambda x: re.sub("\(|\)|\'", "", str(x)))
        | 'Write visitantesTiempoProm to GCS' >> beam.io.WriteToText(
            output_gcs_location + '/visitantesTiempoProm',
            file_name_suffix='.csv',
            header='mac,tiempoProm'))

    (totalVisitantes
        | 'Write totalVisitantes to GCS' >> beam.io.WriteToText(
            output_gcs_location + '/totalVisitantes',
            file_name_suffix='.csv',
            header='totalVisitantes'))

    (visitasCliente
        | beam.Map(lambda x: re.sub("\(|\)|\'", "", str(x)))
        | 'Write visitasCliente to GCS' >> beam.io.WriteToText(
            output_gcs_location + '/visitasCliente',
            file_name_suffix='.csv',
            header='mac,visitas'))

    (totalVisitas
        | 'Write totalVisitas to GCS' >> beam.io.WriteToText(
            output_gcs_location + '/totalVisitas',
            file_name_suffix='.csv',
            header='totalVisitas'))

Out\[8\]:

    <PCollection[[8]: Write totalVisitas to GCS/Write/WriteImpl/FinalizeWrite.None] at 0x7f521d84d210>

### Running the pipeline<a href="#Running-the-pipeline" class="anchor-link">¶</a>

Now we are ready to run the pipeline on Dataflow. `p.run()` will run the
pipeline and return a pipeline result object.

In \[9\]:

    # Corre pipeline y retorna el resultado.
    pipeline_result = p.run()

    WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.

Let's wait for the job to finish. The following call will block until
the job is finished. It will take a few minutes.

In \[10\]:

    # El siguiente llamado espera hasta que el trabajo concluya. Toma unos minutos.
    pipeline_result.wait_until_finish()

Out\[10\]:

    'DONE'

In \[11\]:

    # Cuando el trabajo terminó podemos mirar muestras de los resultados, mediante el comando gsutil y head
    !gsutil ls {output_gcs_location}

    gs://jrodriguez-test/dataflow/output/totalTranseuntes-00000-of-00001.csv
    gs://jrodriguez-test/dataflow/output/totalVisitantes-00000-of-00001.csv
    gs://jrodriguez-test/dataflow/output/totalVisitas-00000-of-00001.csv
    gs://jrodriguez-test/dataflow/output/transeuntes-00000-of-00003.csv
    gs://jrodriguez-test/dataflow/output/transeuntes-00001-of-00003.csv
    gs://jrodriguez-test/dataflow/output/transeuntes-00002-of-00003.csv
    gs://jrodriguez-test/dataflow/output/visitantesTiempoProm-00000-of-00003.csv
    gs://jrodriguez-test/dataflow/output/visitantesTiempoProm-00001-of-00003.csv
    gs://jrodriguez-test/dataflow/output/visitantesTiempoProm-00002-of-00003.csv
    gs://jrodriguez-test/dataflow/output/visitantesTiempoVisitas-00000-of-00003.csv
    gs://jrodriguez-test/dataflow/output/visitantesTiempoVisitas-00001-of-00003.csv
    gs://jrodriguez-test/dataflow/output/visitantesTiempoVisitas-00002-of-00003.csv
    gs://jrodriguez-test/dataflow/output/visitasCliente-00000-of-00003.csv
    gs://jrodriguez-test/dataflow/output/visitasCliente-00001-of-00003.csv
    gs://jrodriguez-test/dataflow/output/visitasCliente-00002-of-00003.csv

In \[12\]:

    !gsutil cat {output_gcs_location}/transeuntes* | head -10

    80:4c:55:9b:eb:3a
    34:02:fd:53:d9:02
    00:99:4a:c4:5e:2b
    c8:c1:7f:3f:69:a3
    00:99:4a:9f:29:bf
    c8:c1:7f:b1:80:a6
    44:e3:73:2e:3f:43
    34:02:fd:f6:7a:c7
    c8:c1:7f:32:56:8c
    cc:0e:eb:3b:34:a6

In \[13\]:

    !gsutil cat {output_gcs_location}/totalTranseuntes* | head -10

    14704

In \[14\]:

    !gsutil cat {output_gcs_location}/totalVisitantes* | head -10

    totalVisitantes
    1507

In \[15\]:

    !gsutil cat {output_gcs_location}/visitantesTiempoProm* | head -10

    mac,tiempoProm
    64:25:e6:42:71:00, 6
    34:02:fd:63:ae:d7, 23
    68:4c:32:20:11:c4, 84
    68:cb:dc:95:32:57, 83
    c8:ba:ab:9e:c5:e5, 15
    d8:a9:20:d9:85:c8, 16
    b4:b2:f1:6d:6e:e5, 42
    18:62:4e:db:e7:29, 8
    7c:04:7d:3b:97:6e, 12

In \[16\]:

    !gsutil cat {output_gcs_location}/visitantesTiempoVisitas* | head -10

    mac,tiempos
    ('e4:a0:08:df:57:ee', [6439000, 7704000])
    ('84:39:de:a3:92:e7', [86409000])
    ('bc:be:40:72:fb:d1', [4049000])
    ('a8:f4:9b:f9:0a:34', [892000])
    ('34:5f:58:4f:ab:c1', [3648000])
    ('10:8c:79:06:9f:72', [621000, 391000])
    ('cc:32:de:b1:74:45', [313000])
    ('f4:aa:d4:5b:9a:4f', [1727000])
    ('14:7e:f7:95:85:f1', [2339000])

In \[17\]:

    !gsutil cat {output_gcs_location}/visitasCliente* | head -10

    mac,visitas
    64:25:e6:42:71:00, 1
    34:02:fd:63:ae:d7, 1
    68:4c:32:20:11:c4, 1
    68:cb:dc:95:32:57, 1
    c8:ba:ab:9e:c5:e5, 1
    d8:a9:20:d9:85:c8, 1
    b4:b2:f1:6d:6e:e5, 1
    18:62:4e:db:e7:29, 1
    7c:04:7d:3b:97:6e, 1

In \[18\]:

    !gsutil cat {output_gcs_location}/totalVisitas* | head -10

    totalVisitas
    1653