In [1]:
# Instala o módulo do PySpark
!pip install -q pyspark==3.1.3

[K     |████████████████████████████████| 214.0 MB 8.1 kB/s 
[K     |████████████████████████████████| 198 kB 49.0 MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [10]:
# Importa os módulos
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, SparkSession
from pyspark.sql import functions as F
import datetime

In [3]:
# Cria a instância da sessão Spark
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [29]:
# Cria dataframe com dados para simular partições
partition_data = [
        ("0","partition=2022-08-20")
        ,("1","partition=2019-07-01")
        ,("2","partition=2019-06-24")
        ,("3","partition=2019-08-24")
        ,("4","partition=2022-07-17")
        ,("5","partition=2022-07-18")
        ,("6","partition=2022-07-19")
        ,("7","partition=2022-07-20")
        ,("8","partition=1987-02-25")
        ,("9","partition=2022-08-30")
        ,("10","partition=2022-09-11")
        ,("11","partition=2022-09-09")
        ,("12","partition=2022-09-12")
        ,("13","partition=2022-09-13")
        ]
dfPartition = spark.createDataFrame(data=partition_data
                                    , schema=["id","partition"])

---
> Para as funções funcionarem em conjunto com o Hive, é necessário adaptar o código para buscar as partições da tabela desejada utilizando um código semelhante ao código abaixo:

> `dataframe = spark.table(f"show partitions {tabela}")`
---

In [54]:
def busca_particao_proxima_tabela(dataframe, dt_referencia=None, diferenca_dias=0, formato_dt_origem='yyyy-MM-dd'):
    '''
        Função para retornar a partição de tabela Hive mais próxima da data informada.
            Parâmetros:
                - dataframe
                    Tipo de dado: dataframe spark
                - dt_referencia
                    Tipo de dado: string
                        Ex.: '2022-09-12'
                - diferenca_dias
                    Tipo de dado: int
                        Ex.: 0
                - formato_dt_origem
                    Tipo de dado: string
                        Ex.: 'yyyy-MM-dd'
    '''

    dt_part_referencia = dt_referencia or datetime.date.today().strftime("%Y-%m-%d")

    try:
            
        df_approx_partition = (
            dataframe
            .select(
                F.regexp_extract('partition', r'([A-Za-z0-9_\-]+)=([A-Za-z0-9_\-]+)',2).alias('partition')
            )
            .withColumn('dt_partition', F.to_date('partition', formato_dt_origem))
            .withColumn('datediff', F.datediff('dt_partition', F.lit(dt_part_referencia).cast('date')))
            .where(F.col('datediff') <= diferenca_dias)
            .orderBy(F.desc('datediff'))
        )

        retorno = df_approx_partition.first()['partition']

    except IndexError:
        retorno = None

    return retorno

In [55]:
# Retorna a partição da tabela mais próxima da data '2022-09-08'
busca_particao_proxima_tabela(dfPartition
                              , dt_referencia='2022-09-08'
                              , diferenca_dias=0
                              , formato_dt_origem='yyyy-MM-dd')

'2022-08-30'

In [52]:
def busca_particao_tabela(dataframe, ordenacao='desc', posicao=0, formato_dt_origem='yyyy-MM-dd'):
    '''
        Função para retornar uma partição de tabela Hive. Também é possível retornar partições em outras posições.
            Parâmetros:
                - dataframe
                    Tipo de dado: dataframe spark
                - ordenacao
                    Tipo de dado: string
                        Ex.: 'desc'
                - posicao
                    Tipo de dado: int
                        Ex.: 
                            0 -> Irá retornar a primeira posição do dataframe
                            1 -> Irá retornar a segunda posição do dataframe
                - formato_dt_origem
                    Tipo de dado: string
                        Ex.: 'yyyy-MM-dd'
    '''
    
    try:

        if ordenacao=='desc':

            df_particao = (
                dataframe
                .select(
                    F.regexp_extract('partition', r'([A-Za-z0-9_\-]+)=([A-Za-z0-9_\-]+)',2).alias('partition')
                )
                .withColumn('dt_partition', F.to_date('partition', formato_dt_origem))
                .orderBy(F.desc('dt_partition'))
            )

        else:
            
            df_particao = (
                dataframe
                .select(
                    F.regexp_extract('partition', r'([A-Za-z0-9_\-]+)=([A-Za-z0-9_\-]+)',2).alias('partition')
                )
                .withColumn('dt_partition', F.to_date('partition', formato_dt_origem))
                .orderBy(F.asc('dt_partition'))
            )

        retorno = df_particao.collect()[posicao]['partition']

    except IndexError:
        retorno = None

    return retorno

In [56]:
# Retorna a partição mais recente da tabela
busca_particao_tabela(dfPartition
                      , ordenacao='desc'
                      , posicao=0
                      , formato_dt_origem='yyyy-MM-dd')

'2022-09-13'