In [1]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
import time
#Usar ML lib para tratar as categorias dinamicamente
from pyspark.ml.feature import CountVectorizer
start = time.time()

def criarFeatures(df):
  #prepara a coluna categorias para ser usada pela biblioteca - precisa ser um array
  df2 = df.select('id_cliente', split('categorias', ', ').alias('categorias'))

  #gera o vetor de categorias
  cv = CountVectorizer(inputCol='categorias', outputCol='c1', binary=True)
  model = cv.fit(df2)

  #obtem o vocabulario - lista de features
  vocabulary = model.vocabulary

  #define uma UDF para tratar o array como lista na saída
  udf_to_array = udf(lambda v: v.toArray().tolist(), 'array<double>')

  df1 = model.transform(df2)

  return df1.withColumn('c2', udf_to_array('c1')) \
     .select('id_cliente', *[ col('c2')[i].astype('int').alias(vocabulary[i]) for i in range(len(vocabulary))])

#Dataframe exemplo 1
df = spark.createDataFrame([
    ('client-uuid-1',  'cat-1, cat-2, cat-3'),
    ('client-uuid-2',  'cat-1, cat-4, cat-5'),
    ('client-uuid-3',  'cat-6, cat-7'),
    ('client-uuid-4',  'cat-1, cat-2, cat-7, cat-10'),
    ('client-uuid-5',  'cat-8, cat-10'),
    ('client-uuid-6',  'cat-1, cat-9, cat-10'),
    ('client-uuid-7',  'cat-1, cat-4, cat-5, cat-10'),
    ('client-uuid-8',  'cat-7, cat-9'),
    ('client-uuid-9',  'cat-1'),
    ('client-uuid-10', 'cat-1, cat-2, cat-3, cat-4, cat-5, cat-6, cat-7, cat-8, cat-10')
], ['id_cliente', 'categorias'])

#Dataframe exemplo 2
df2 = spark.createDataFrame([
    ('client-uuid-1',  'cat-1, cat-2, cat-3, cat-15'),
    ('client-uuid-2',  'cat-1, cat-4, cat-5, cat-11, cat-14'),
    ('client-uuid-3',  'cat-4, cat-14, cat-15'),
    ('client-uuid-4',  'cat-1, cat-2, cat-7, cat-10'),
    ('client-uuid-5',  'cat-8, cat-10, cat-11'),
    ('client-uuid-6',  'cat-1, cat-9, cat-10, cat-11, cat-13'),
    ('client-uuid-7',  'cat-1, cat-4, cat-5, cat-10'),
    ('client-uuid-8',  'cat-7, cat-9, cat-12, cat-13, cat-14'),
    ('client-uuid-9',  'cat-2'),
    ('client-uuid-10', 'cat-1, cat-2, cat-3, cat-4, cat-5, cat-6, cat-7, cat-8, cat-10')
], ['id_cliente', 'categorias'])

resultado = criarFeatures(df)
resultado2 = criarFeatures(df2)
resultado.select('id_cliente', *sorted(resultado.columns[1:])).show()
resultado2.select('id_cliente', *sorted(resultado2.columns[1:])).show()

print(time.time() - start)