In [1]:
from pyspark.sql import SparkSession
# Cria uma SparkSession
spark = SparkSession.builder.appName("TransformacaoPosicionalParaJson").getOrCreate()

In [2]:
# Criando o dataframe de exemplo
df = spark.createDataFrame([(["Valor1", "Valor2"], ["Valor3", "Valor4"])], ["CampoA", "CampoB"])
df.show()

+----------------+----------------+
|          CampoA|          CampoB|
+----------------+----------------+
|[Valor1, Valor2]|[Valor3, Valor4]|
+----------------+----------------+



In [5]:
from typing import List, Dict
from pyspark.sql.functions import *

def converte_array(df) -> List[Dict]:
    """
    Converte as colunas do tipo Array para uma lista de dicionário
    
    No dataframe de entrada a colunas estão no seguinte formato
    colunaA: [ValorColA1, ValorColA2,..]
    colunaB: [ValorColB1, ValorColB2,..]

    O dicionário de saída é retornado no seguinte formato
    lista_dict = [{"colunaA":ValorColA1, "colunaB":ValorColB1}, 
                  {"colunaA":ValorColA2, "colunaB":ValorColB2}]
    """

    # verifica se todas as colunas sao um array
    if not all([tipo[1] == 'array<string>' for tipo in df.dtypes]):
      raise ValueError('Todas as colunas do dataframe de entrada devem ser do tipo Array')

    # selecionar todas as colunas do DataFrame e aplicar o arrays_zip para criar uma nova coluna Zip
    df_with_zip = df.selectExpr("*", "arrays_zip(*) as Zip")

    # aplicar o explode na coluna Zip para expandir as listas em cada linha
    df_exploded = df_with_zip.select(explode("Zip").alias("Exploded"))

    # selecionar os elementos das listas expandidas e criar uma lista de dicionários
    lista_dict = [dict(zip(df.columns, row["Exploded"])) for row in df_exploded.collect()]

    return lista_dict

In [6]:
json_principal = {"Atributo1": 1, "Atributo2": 2}

In [7]:
listas_ = {"lista_a": df}

for lista in listas_:
  lista_json = converte_array(listas_[lista])
  json_principal[lista] = lista_json

In [8]:
json_principal

{'Atributo1': 1,
 'Atributo2': 2,
 'lista_a': [{'CampoA': 'Valor1', 'CampoB': 'Valor3'},
  {'CampoA': 'Valor2', 'CampoB': 'Valor4'}]}

In [9]:
import unittest
from pyspark.sql import SparkSession
from typing import List, Dict

class TestConverteArray(unittest.TestCase):

    def setUp(self) -> None:
        self.spark = SparkSession.builder.appName("TestConverteArray").getOrCreate()
        self.df = self.spark.createDataFrame([
            (["ValorA1", "ValorA2"], ["ValorB1", "ValorB2"]),
            (["ValorA3", "ValorA4"], ["ValorB3", "ValorB4"])
        ], ["colunaA", "colunaB"])

    def tearDown(self) -> None:
        self.spark.stop()

    def test_converte_array(self):
        """
        Testa se a função converte_array retorna a lista de dicionários correta.
        """
        expected_output = [
            {"colunaA": "ValorA1", "colunaB": "ValorB1"},
            {"colunaA": "ValorA2", "colunaB": "ValorB2"},
            {"colunaA": "ValorA3", "colunaB": "ValorB3"},
            {"colunaA": "ValorA4", "colunaB": "ValorB4"}
        ]
        output = converte_array(self.df)
        self.assertEqual(expected_output, output)

    def test_converte_array_coluna_invalida(self):
        """
        Testa se a função converte_array gera uma exceção para uma coluna que não é do tipo Array.
        """
        df_invalido = self.spark.createDataFrame([
            (1, ["ValorB1", "ValorB2"])
        ], ["colunaA", "colunaB"])
        with self.assertRaises(ValueError):
            converte_array(df_invalido)

    def test_converte_array_valores_nulos(self):
        """
        Testa se a função converte_array lida corretamente com valores nulos nas colunas do DataFrame.
        """
        df_nulos = self.spark.createDataFrame([
            (["ValorA1", None], ["ValorB1", "ValorB2"]),
            (["ValorA3", "ValorA4"], [None, "ValorB4"])
        ], ["colunaA", "colunaB"])
        expected_output = [
            {"colunaA": "ValorA1", "colunaB": "ValorB1"},
            {"colunaA": None, "colunaB": "ValorB2"},
            {"colunaA": "ValorA3", "colunaB": None},
            {"colunaA": "ValorA4", "colunaB": "ValorB4"}
        ]
        output = converte_array(df_nulos)
        self.assertEqual(expected_output, output)

    def test_converte_array_apenas_uma_linha(self):
        """
        Testa se a função converte_array retorna uma lista com apenas um dicionário para um DataFrame com uma única linha.
        """
        df_uma_linha = self.spark.createDataFrame([
            (["ValorA1", "ValorA2"], ["ValorB1", "ValorB2"])
        ], ["colunaA", "colunaB"])
        expected_output = [
            {"colunaA": "ValorA1", "colunaB": "ValorB1"},
            {"colunaA": "ValorA2", "colunaB": "ValorB2"}
        ]
        output = converte_array(df_uma_linha)
        self.assertEqual(expected_output, output)


In [10]:
test_loader = unittest.TestLoader()
suite = test_loader.loadTestsFromTestCase(TestConverteArray)

# Executa os casos de teste
test_runner = unittest.TextTestRunner(verbosity=2)
test_runner.run(suite)

test_converte_array (__main__.TestConverteArray)
Testa se a função converte_array retorna a lista de dicionários correta. ... ok
test_converte_array_apenas_uma_linha (__main__.TestConverteArray)
Testa se a função converte_array retorna uma lista com apenas um dicionário para um DataFrame com uma única linha. ... ok
test_converte_array_coluna_invalida (__main__.TestConverteArray)
Testa se a função converte_array gera uma exceção para uma coluna que não é do tipo Array. ... ok
test_converte_array_valores_nulos (__main__.TestConverteArray)
Testa se a função converte_array lida corretamente com valores nulos nas colunas do DataFrame. ... ok

----------------------------------------------------------------------
Ran 4 tests in 7.684s

OK


<unittest.runner.TextTestResult run=4 errors=0 failures=0>