In [1]:
import pyspark
import json
import random
from pyspark.sql.functions import col
from pyspark.sql import functions as fn
import pyspark.ml.feature as ft
import pyspark.ml.clustering as clus

spark = pyspark.sql.SparkSession.builder.appName("stats_and_sankey").getOrCreate()


SEED = 42
num_topics = 45
file_in = "atari_topic_names/"
models_in = "atari_models/"
files_out = "atari_sankey_and_stats/"

In [2]:
print('cargando modelos en memoria...')
print('- vectorizer...')
count_vectorizer_model_load = ft.CountVectorizerModel.load(models_in+'/count_vectorizer_model')
print('- lda...')
lda_model_load = clus.DistributedLDAModel.load(models_in+'/lda_model')

cargando modelos en memoria...
- vectorizer...
- lda...


In [3]:
print("calculando resumen de topics...")
topics = lda_model_load.describeTopics(15).take(100)
topic_terms = dict()
topic_index = dict()
for topic in topics:
	terms_in_topic = []
	for i, v in enumerate(topic['termIndices']):
		if v not in topic_index:
			topic_index[v] = (count_vectorizer_model_load.vocabulary[v], topic['termWeights'][i])
		terms_in_topic.append(topic_index[v])

	topic_terms[topic['topic']] = terms_in_topic

calculando resumen de topics...


In [11]:
f = open(files_out+"topic_terms_table.txt","w+")
for i, terms in topic_terms.items():
    termlist = [t[0] for t in terms[:7]]
    f.write("#{}: {}\n".format(i, termlist))
f.close()

In [7]:
f = open(files_out+"topic_terms.json","w+")
f.write(json.dumps(topic_terms))
f.close()

In [12]:
lookup_weights = spark.sparkContext.parallelize(topic_terms.items()).toDF(["topic_id", "tuple"])


In [13]:
df = spark.read.json(file_in)

In [10]:
df_term_weight = df.groupBy('topic_id')\
    .count()\
    .join(lookup_weights, ["topic_id"], "left")\
    .na.fill("tuple", "--")\
    .withColumn('tuple', fn.explode(col('tuple')))\
    .select(col("topic_id"), col("tuple._1"), col('tuple._2'), col('count'))\
    .withColumn('weight_per_topic', col('_2') * col('count'))\
    .select(col('_1').alias('term'), col('weight_per_topic'))\
    .groupBy('term')\
    .agg(fn.sum('weight_per_topic').alias('weight'))\
    .take(45*15)

In [11]:
# intento de equilibrar los tamaños un poco
import math
def sigmoid(x):
    return (1/(1 + (math.e)**(-x))) - 0.5

In [16]:
max_font_size = 100
min_font_size = 10

vega_term_weights = [{"text": r['term'], 'value': r['weight']} for r in df_term_weight]
all_weights = [t['value'] for t in vega_term_weights]
factor = (max(all_weights) - min(all_weights)) / (max_font_size - min_font_size)
#term_weights = [{"text": r['term'], 'value': int(max_font_size * (sigmoid(r['weight'] / max(all_weights))) + min_font_size)} for r in df_term_weight]
vega_term_weights = [{"text": r['term'], 'value': int(r['weight'] / factor + min_font_size)} for r in df_term_weight]

# lo más cómo es renderizarlos en http://trifacta.github.io/vega/editor/
f = open(files_out+"vega_term_weights.json","w+")
f.write(json.dumps(vega_term_weights))
f.close()


In [13]:
topic_titles = {
    0: "Dev",
    1: "HW", # mods, repairs
    2: "HW", # mods
    3: "Market",
    4: "Games",  # releases
    5: "Console Systems", # console parts, details, photos, marketplace...
    6: "Music",  # dev, soundtracks...
    7: "Games",
    8: "Games",
    9: "Dev",
    10: "HW",  # mods, upgrades
    11: "Games", # discussion, expo, conventions
    12: "Dev", # + Game discussion
    13: "Games", # play, guides, cheats
    14: "Console Systems", # o Games, es muy especifico
    15: "HW", # especifico de joysticks
    16: "Social", # games, console systems, discussion...
    17: "Social", # discussion, what if, favorites...
    18: "Dev",
    19: "HW",
    20: "Social", # console systems, mods, devs, help...
    21: "Console Systems",
    22: "Social", # expos, scans, magazines
    23: "HW", 
    24: "Social", # games, hw
    25: "Social", # games, recommendations
    26: "HW", # mods, controllers
    27: "Market",
    28: "HW", # diy, problems..
    29: "Social", # pero sobre temas de marketplace
    30: "Social", # questions, offtopic, recommendations
    31: "Dev", # DIY 
    32: "Dev", # con mucho HW
    33: "Console Systems", # Console systems... very specific 
    34: "Market",
    35: "Social", # collections sobre todo, dicussions
    36: "Social", # nintendo, videos, 
    37: "Games", # discussion
    38: "Social", # discussion
    39: "Social", # collections, looking for stuff...
    40: "Games", # expo, releases
    41: "Market",
    42: "HW", # sobre todo video, + marketplace
    43: "Social", # discussion about everything
    44: "Console Systems"
 }

colors = ['rgb(141,211,199)','rgb(255,255,179)','rgb(190,186,218)','rgb(251,128,114)','rgb(128,177,211)','rgb(253,180,98)','rgb(179,222,105)','rgb(252,205,229)','rgb(217,217,217)','rgb(188,128,189)','rgb(204,235,197)']

topic_list = {t for t in topic_titles.values()}
topic_list.add('Non-English')
topic_list.add('HSC')

topic_color = {}
for i, v in enumerate(topic_list):
    topic_color[v] = colors[i]
    
lookup = spark.sparkContext.parallelize(topic_titles.items()).toDF(["topic_id", "topic_title"])


In [14]:

links_A = df\
	.groupBy(col('forum_title'), col('topic_title'))\
	.count()\
	.sort(col('forum_title'), col('topic_title'))\
	.take(1000)

links = [{'source': e['forum_title'], 'target': e['topic_title'], 'value': e['count'], 'type': e['topic_title'], 'color': topic_color.get(e['topic_title'], 'rgb(0,0,0)')} for e in links_A]

f = open(files_out+"links.json","w+")
f.write(json.dumps(links))
f.close()


print("links done ##############################################################")

node_set = set()
for n in links:
    node_set.add(n['source'])
    node_set.add(n['target'])

nodes = [{'id': t, 'title': t} for t in node_set]

f = open(files_out+"nodes.json","w+")
f.write(json.dumps(nodes))
f.close()


print("nodes done ##############################################################")

df_forum_titles = df.select(col('forum_title')).distinct().sort(col('forum_title')).take(1000)
order_forum_titles = [r['forum_title'] for r in df_forum_titles if r['forum_title'] in node_set]

df_topic_titles = df.select(col('topic_title')).distinct().sort(col('topic_title')).take(1000)
order_topic_titles = [r['topic_title'] for r in df_topic_titles if r['topic_title'] in node_set]


order = [
	[
		order_forum_titles
	],
	[
		order_topic_titles
	]
]

f = open(files_out+"order.json","w+")
f.write(json.dumps(order))
f.close()

print("order done ##############################################################")


# topic_groups = {t for t in topic_titles.values()}
# groups = [{'title': t, 'type': 'process', 'id': t, 'bundle': None, 'def_pos': None, 'nodes': ["{}^*".format(t)]} for t in topic_groups]

# f = open(files_out+"groups.json","w+")
# f.write(json.dumps(groups))
# f.close()


# print("groups done ##############################################################")


sankey = {
	"links": links,
	"order": order,
	"nodes": nodes,
 #   "groups": groups
}


f = open(files_out+"sankey.json","w+")
f.write(json.dumps(sankey))
f.close()

print("all done ##############################################################")


links done ##############################################################
nodes done ##############################################################
order done ##############################################################
all done ##############################################################


In [15]:
df.groupBy('topic_title').count().orderBy('topic_title').show(num_topics+2)

+---------------+------+
|    topic_title| count|
+---------------+------+
|Console Systems|364803|
|            Dev|339077|
|          Games|566014|
|            HSC|122358|
|             HW|475348|
|         Market|606303|
|          Music| 28121|
|    Non-English|  4332|
|         Social|755891|
+---------------+------+



In [16]:
topic_freq = df.groupBy('topic_id').count()
f = open(files_out+"topic_freq.txt","w+")
f.write('by topic_id:\n')

topic_freq_1 = topic_freq.orderBy('topic_id').take(100)
for r in topic_freq_1:
    f.write("#{}: {}\n".format(r['topic_id'], r['count']))


f.write(('*' * 40) + '\n')

topic_freq_2 = topic_freq.orderBy('count').take(100)
f.write('by count:\n')
for r in topic_freq_2:
    f.write("#{}: {}\n".format(r['topic_id'], r['count']))
    
f.close()

In [82]:
threads = df\
    .groupBy(col('thread_code_url'), col('topic_id'), col('topic_title'))\
    .agg(fn.collect_list('post_text').alias('thread_text'), fn.min('post_date').alias('thread_date'), fn.min('thread_title').alias('thread_title'))\
    .withColumn('thread_text', fn.concat_ws(' ', col('thread_text')))
threads.createOrReplaceTempView('threads')

print("grabando ejemplo de topics a disco...")
summary = spark.sql("""
SELECT
  topic_id,
  topic_title,
  thread_title,
  thread_text
FROM (
  SELECT
    topic_id,
    topic_title,
    thread_text,
    thread_title,
    rank() OVER (PARTITION BY topic_id ORDER BY thread_date DESC) as rank
  FROM threads) tmp
WHERE
  rank <= 20
ORDER BY topic_title, topic_id ASC
""").take(100000)

grabando ejemplo de topics a disco...


In [83]:
row = ["<tr><td>t{}</td><td>{}</td><td>{}</td><td>{}</td></tr>".format(r['topic_id'], r['topic_title'], r['thread_title'], r['thread_text']) for r in summary]


In [84]:
html = """<html>
<head></head>
<body>
<table border=1>{}</table>
</body>
</html>""".format("\n".join(row))

In [85]:
f = open(files_out+"summary.html","w+")
f.write(html)
f.close()

Sólo si hace falta planchar otra vez los topic titles

In [80]:
df = spark.read.json(file_in).drop('topic_title_2').drop('topic_title')

In [81]:
df = df.join(lookup, ["topic_id"], "left").na.fill("topic_title", "--")
df.createOrReplaceTempView('df')
print("renombrando algunos topic titles...")

spark.sql("""CREATE OR REPLACE TEMP VIEW non_english AS
SELECT *, IF(forum_code = '9-international/', 'Non-English', topic_title) AS topic_title_2 FROM df""")

spark.sql("""CREATE OR REPLACE TEMP VIEW hsc AS
SELECT *,  IF(subforum_title like '%High Score Club%', 'HSC', topic_title_2) AS topic_title_3 FROM non_english""")

df = spark.sql("SELECT * FROM hsc")\
        .drop('topic_title').drop('topic_title_2')\
        .withColumnRenamed('topic_title_3', 'topic_title')


renombrando algunos topic titles...
