In [1]:
from pyspark.sql import SparkSession

def get_spark(memory, delta = False):
    """
    Cria uma sessão do spark e uma sessão do delta lake
    
    Args:
        memory: (int) valor da memória do driver e executor
        delta: (bool) flag para definir o uso do delta lake
    Returns:
        (spark session), (delta lake session, optional)
    """
    spark = (
        SparkSession
        .builder
        .config('spark.sql.broadcastTimeout', '360000')
        .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer')
        .config('spark.driver.memory', f'{memory}G')
        .config('spark.executor.memory', f'{memory}G')
        .config('spark.driver.maxResultSize', '4G')
        .config('spark.sql.debug.maxToStringFields', 100)
        .config('spark.ui.showConsoleProgress', 'true')
        .config("spark.databricks.delta.schema.autoMerge.enabled", "true")
        .config("spark.jars.packages", "io.delta:delta-core_2.12:1.0.0")
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
        .config("spark.databricks.delta.retentionDurationCheck.enabled", "false")
        .config("spark.databricks.delta.vacuum.parallelDelete.enabled", "true")
        
        .getOrCreate()
    )

    if delta:
        from delta.tables import DeltaTable
        return spark, DeltaTable

    return spark

In [2]:
spark, delta = get_spark(1, delta = True)

22/03/14 14:38:22 WARN Utils: Your hostname, luan-VirtualBox resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
22/03/14 14:38:22 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/luan/anaconda3/envs/futebol/lib/python3.8/site-packages/pyspark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/luan/.ivy2/cache
The jars for the packages stored in: /home/luan/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-3d5c4c87-75e2-4547-b07f-4767719918f3;1.0
	confs: [default]
	found io.delta#delta-core_2.12;1.0.0 in central
	found org.antlr#antlr4;4.7 in central
	found org.antlr#antlr4-runtime;4.7 in central
	found org.antlr#antlr-runtime;3.5.2 in central
	found org.antlr#ST4;4.0.8 in central
	found org.abego.treelayout#org.abego.treelayout.core;1.0.3 in central
	found org.glassfish#javax.json;1.0.4 in central
	found com.ibm.icu#icu4j;58.2 in central
:: resolution report :: resolve 1024ms :: artifacts dl 29ms
	:: modules in use:
	com.ibm.icu#icu4j;58.2 from central in [default]
	io.delta#delta-core_2.12;1.0.0 from central in [default]
	org.abego.treelayout#org.abego.treelayout.core;1.0.3 from central in [default]
	org.antlr#ST4;4.0.8 from central in [default]
	org.antlr#antlr-runtim

In [4]:
spark.createDataFrame

<bound method SparkSession.createDataFrame of <pyspark.sql.session.SparkSession object at 0x7fc0dc136be0>>

In [5]:
delta

delta.tables.DeltaTable

In [78]:
from cafu.metadata import campeonato_espn

In [79]:
campeonatos = campeonato_espn()
metadata = {'jogos_ids':{c: {} for c in campeonatos}, 
            'partidas': {c: {} for c in campeonatos}}

In [80]:
metadata

{'jogos_ids': {'brasil': {}, 'espanha': {}, 'franca': {}},
 'partidas': {'brasil': {}, 'espanha': {}, 'franca': {}}}

In [81]:
for c in campeonatos:
    for t in campeonatos[c]:
        metadata['partidas'][c][t] = {}

In [82]:
metadata

{'jogos_ids': {'brasil': {}, 'espanha': {}, 'franca': {}},
 'partidas': {'brasil': {'2021-2021': {},
   '2020-2020': {},
   '2019-2019': {},
   '2018-2018': {},
   '2017-2017': {},
   '2016-2016': {},
   '2015-2015': {},
   '2014-2014': {}},
  'espanha': {'2021-2022': {},
   '2020-2021': {},
   '2019-2020': {},
   '2018-2019': {},
   '2017-2018': {},
   '2016-2017': {},
   '2015-2016': {},
   '2014-2015': {},
   '2013-2014': {},
   '2012-2013': {},
   '2011-2012': {}},
  'franca': {'2021-2022': {}, '2020-2021': {}, '2019-2020': {}}}}

In [83]:
from datetime import date
import json
import pandas as pd
from cafu.metadata.paths import path
path_datalake = path('datalake')

In [89]:
today = date.today()

r = open(path_datalake+'/metadata.json', 'r')
metadata_datalake = json.load(r)

campeonatos = [
               [k1, k2] for k1 in list(metadata_datalake['jogos_ids'].keys()) 
                        for k2 in list(metadata_datalake['jogos_ids'][k1].keys())
                        if metadata_datalake['jogos_ids'][k1][k2] != 'failed'
              ]

jogos_atrasados = {}
for c in campeonatos:
    df = pd.read_csv(path_datalake+f'/jogos_ids/{c[0]}/{c[1]}.csv')
    jogos_ocorridos = df[df['dates']<str(today)]['jogo_id']
    jogos_atualizados = []
    for j in metadata['partidas'][c[0]][c[1]]:
        if metadata['partidas'][c[0]][c[1]][j]['status'] != 'failed':
            jogos_atualizados.append(j)

    jogos_atrasados_c =  list(set(jogos_ocorridos).difference(jogos_atualizados))
    if len(jogos_atrasados_c)>0:
        try:
            jogos_atrasados[c[0]][c[1]] = jogos_atrasados_c
        except:
            jogos_atrasados[c[0]] = {}
            jogos_atrasados[c[0]][c[1]] = jogos_atrasados_c

In [93]:
jogos_atrasados['espanha']['2020-2021']

[581823,
 581824,
 581825,
 581826,
 581827,
 581828,
 581829,
 581830,
 581831,
 581832,
 581833,
 581834,
 581835,
 581836,
 581837,
 581838,
 581839,
 581840,
 581841,
 581842,
 581843,
 581844,
 581845,
 581846,
 581847,
 581848,
 581849,
 581850,
 581851,
 581852,
 581853,
 581854,
 581855,
 581856,
 581857,
 581858,
 581859,
 581860,
 581861,
 581862,
 581863,
 581864,
 581865,
 581866,
 581867,
 581868,
 581869,
 581870,
 581871,
 581872,
 581873,
 581874,
 581875,
 581876,
 581877,
 581878,
 581879,
 581880,
 581881,
 581882,
 581883,
 581884,
 581885,
 581886,
 581887,
 581888,
 581889,
 581890,
 581891,
 581892,
 581893,
 581894,
 581895,
 581896,
 581897,
 581898,
 581899,
 581900,
 581901,
 581902,
 581903,
 581904,
 581905,
 581906,
 581907,
 581908,
 581909,
 581910,
 581911,
 581912,
 581913,
 581914,
 581915,
 581916,
 581917,
 581918,
 581919,
 581920,
 581921,
 581922,
 581923,
 581924,
 581925,
 581926,
 581927,
 581928,
 581929,
 581930,
 581931,
 581932,
 581933,
 

In [71]:
jogos_ocorridos = df[df['dates']<str(today)]['jogo_id']

In [72]:
metadata['partidas'][c[0]][c[1]]['598226'] = {'status':'success', 'times': ['bahia', 'cruzeiro'], 'dir': '2022-05-14'}

In [73]:
c = ['espanha', '2020-2021']

In [76]:
jogos_atualizados = []
for j in metadata['partidas'][c[0]][c[1]]:
    if metadata['partidas'][c[0]][c[1]][j]['status'] != 'failed':
        jogos_atualizados.append(j)
        
jogos_atrasados =  list(set(jogos_ocorridos).difference(jogos_ocorridos))

In [77]:
jogos_atrasados

[]

In [46]:
jogos_atualizados = [
                     metadata['partidas'][c][t] for c in metadata['partidas']
                                                for t in metadata['partidas'][c]
                                                #if metadata['partidas'][c][t]['status'] != 'failed'
                    ]

In [47]:
jogos_atualizados

[{},
 {},
 {},
 {},
 {},
 {},
 {},
 {},
 {'Real Madrid vs Barcelona': 13},
 {},
 {},
 {},
 {},
 {},
 {},
 {},
 {},
 {},
 {},
 {},
 {},
 {}]