In [15]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/home/mmkshira/spark-3.2.0-bin-hadoop2.7"

import findspark
findspark.init()


from pyspark.sql import SparkSession
import random

spark = SparkSession.builder.appName("YourTest").master("local[2]").config('spark.ui.port', random.randrange(4000,5000)).getOrCreate()

In [16]:
from pyspark.sql.functions import explode
from pyspark.sql import functions as F
from pyspark.sql.functions import concat_ws

In [17]:
training_data = spark.read.json("test.json")

                                                                                

In [18]:
training_data.createOrReplaceTempView("test")

In [19]:
testing1 = spark.sql("select * from test")
#testing1.show()
testing1.count()
testing1.columns

                                                                                

['abstract',
 'authors',
 'citation_for_year',
 'citationcount',
 'conferenceseriesid',
 'confname',
 'confplace',
 'confseries',
 'confseriesname',
 'countries',
 'cso_annotated',
 'cso_enhanced_topics',
 'cso_semantic_topics',
 'cso_syntactic_topics',
 'dbpedia_categories',
 'doi',
 'grid_type',
 'id',
 'industrial_sectors',
 'journame',
 'language',
 'papertitle',
 'references',
 'topics',
 'type',
 'urls',
 'year']

In [20]:
testing2 = testing1.select("id",explode("authors").alias("authors"))
testing3 = testing2.select(testing2.id.alias("paper_id"), \
    F.col("authors").getItem("name").alias("name"), \
    F.col("authors").getItem("country").alias("country"), \
    F.col("authors").getItem("id").alias("author_id"), \
    F.col("authors").getItem("affiliation").alias("affiliation"), \
    F.col("authors").getItem("affiliationid").alias("affiliationid"), \
    F.col("authors").getItem("order").alias("order"), \
    )
testing3.createOrReplaceTempView('authors')
testing3.show()

+----------+--------------------+--------------+----------+--------------------+-------------+-----+
|  paper_id|                name|       country| author_id|         affiliation|affiliationid|order|
+----------+--------------------+--------------+----------+--------------------+-------------+-----+
|2009980510|virendra kumar yadav|         India|2345326510|     KIIT University|     67357951|    3|
|2009980510|          rahul paul|         India|2602231438|     KIIT University|     67357951|    1|
|2009980510| anuja kumar acharya|         India|2160146790|     KIIT University|     67357951|    2|
|2009980510|       saumya batham|         India|2680702175|     KIIT University|     67357951|    4|
|1995396835|       krishanu seal|United Kingdom|2228888007|Yahoo (United Kin...|   1325784139|    4|
|1995396835|girish ananthakri...|United Kingdom|2229052760|Yahoo (United Kin...|   1325784139|    5|
|1995396835|prateeksha uday c...|United Kingdom|2612121147|Yahoo (United Kin...|   13257841

In [21]:
paper_id_country=spark.sql(
    """select paper_id, 
    country,  
    count(country) over (partition by paper_id,country) as total_authors_country,
    count(paper_id) over (partition by paper_id) as total_authors from authors"""
)
paper_id=spark.sql(
    """select paper_id,   
    count(paper_id) over (partition by paper_id) as total_authors from authors"""
)


In [22]:
testing4= paper_id_country.selectExpr("paper_id","country","total_authors_country/total_authors As prop").distinct()
testing5 = testing4.select("paper_id","country",concat_ws(':',testing4.country,testing4.prop).alias("country_prop"))
testing6 = testing5.groupby("paper_id").agg(F.concat_ws(", ", F.collect_list(testing5.country_prop))).filter("paper_id = 1996110717")

In [23]:
testing4.show()
testing5.show()
testing6.show()

                                                                                

+--------+-------------+----+
|paper_id|      country|prop|
+--------+-------------+----+
|   14808|         null| 0.0|
|   94027|         null| 0.0|
|  133140|         null| 0.0|
|  151232|         null| 0.0|
|  151232|       France| 0.5|
|  177175|United States| 1.0|
|  190774|      Germany| 1.0|
|  197896|    Singapore| 1.0|
|  198536|         null| 0.0|
|  198536|United States| 0.4|
|  202246|        China| 1.0|
|  204884|         null| 0.0|
|  258746|         null| 0.0|
|  258746|      Austria|0.25|
|  267997|        Egypt|0.25|
|  267997|       Kuwait| 0.5|
|  267997|       Norway|0.25|
|  287947|         null| 0.0|
|  309944|United States| 1.0|
|  344186|         null| 0.0|
+--------+-------------+----+
only showing top 20 rows



                                                                                

+--------+-------------+-----------------+
|paper_id|      country|     country_prop|
+--------+-------------+-----------------+
|   14808|         null|              0.0|
|   94027|         null|              0.0|
|  133140|         null|              0.0|
|  151232|         null|              0.0|
|  151232|       France|       France:0.5|
|  177175|United States|United States:1.0|
|  190774|      Germany|      Germany:1.0|
|  197896|    Singapore|    Singapore:1.0|
|  198536|         null|              0.0|
|  198536|United States|United States:0.4|
|  202246|        China|        China:1.0|
|  204884|         null|              0.0|
|  258746|         null|              0.0|
|  258746|      Austria|     Austria:0.25|
|  267997|        Egypt|       Egypt:0.25|
|  267997|       Kuwait|       Kuwait:0.5|
|  267997|       Norway|      Norway:0.25|
|  287947|         null|              0.0|
|  309944|United States|United States:1.0|
|  344186|         null|              0.0|
+--------+-

[Stage 43:>                                                         (0 + 1) / 1]

+----------+-----------------------------------------+
|  paper_id|concat_ws(, , collect_list(country_prop))|
+----------+-----------------------------------------+
|1996110717|                     South Korea:0.857...|
+----------+-----------------------------------------+



                                                                                

In [24]:
testing2 = testing1.select("id",explode("topics").alias("topic"))
testing2.createOrReplaceTempView('topics')
testing3=spark.sql(
    """select topic,   
    count(id) over (partition by topic) as total_papers_topic from topics"""
)
testing3.distinct().sort("total_papers_topic", ascending= False).show(10,False)




+-----------------------+------------------+
|topic                  |total_papers_topic|
+-----------------------+------------------+
|computer science       |999908            |
|artificial intelligence|183699            |
|computer network       |109615            |
|computer vision        |87246             |
|electrical engineering |64905             |
|mechanical engineering |64275             |
|computer hardware      |62642             |
|real time computing    |54914             |
|distributed computing  |49578             |
|pattern recognition    |47973             |
+-----------------------+------------------+
only showing top 10 rows



                                                                                

In [25]:
# correlated topics with selected topics
sel_topic = "artificial intelligence"
testing2 = testing1.filter(F.array_contains("topics",sel_topic)).select("id",explode("topics").alias("topic")).filter("topic != \'" +sel_topic+ "\'")
testing3 = testing2.groupBy("topic").count().sort("count",ascending=False).show()
testing2.show()




+--------------------+------+
|               topic| count|
+--------------------+------+
|    computer science|183686|
|     computer vision| 87246|
| pattern recognition| 47901|
|    machine learning| 47867|
|natural language ...| 19017|
|         data mining| 14556|
|artificial neural...| 11231|
|  feature extraction| 10842|
|  speech recognition| 10407|
|    image processing| 10324|
|               pixel|  9285|
|computer graphics...|  8581|
|        segmentation|  7620|
|           algorithm|  7576|
|support vector ma...|  7485|
|    cluster analysis|  6232|
|          classifier|  6033|
|  image segmentation|  5792|
|               robot|  5281|
|          robustness|  5159|
+--------------------+------+
only showing top 20 rows

+----------+--------------------+
|        id|               topic|
+----------+--------------------+
|2018731854|     computer vision|
|2018731854|     medical imaging|
|2018731854|            detector|
|2018731854|    computer science|
|2018731854|   3

                                                                                

In [12]:
# Authors for a given topic
sel_topic = "multimedia"
testing2 = testing1.filter(F.array_contains("topics",sel_topic)).select("id",explode("authors").alias("authors"),"topics","type")
testing3 = testing2.select(testing2.id.alias("paper_id"), testing2.topics.alias("topics"), testing2.type.alias("type"), \
    F.col("authors").getItem("name").alias("name"), \
    F.col("authors").getItem("country").alias("country"), \
    F.col("authors").getItem("id").alias("author_id"), \
    F.col("authors").getItem("affiliation").alias("affiliation"), \
    )
testing4 = testing3.groupBy("country").count().sort("count",ascending=False) # country for a given topic
testing4 = testing3.groupBy("type").count().sort("count",ascending=False) # type for a given topic
testing4 = testing3.groupBy("affiliation").count().sort("count",ascending=False) # affiliation for a given topic
testing4 = testing3.groupBy("name").count().sort("count",ascending=False) # authors for a given topic
testing4.show(5,False)



+--------------+-----+
|name          |count|
+--------------+-----+
|jordin t kare |2    |
|lowell l wood |2    |
|richard t lord|2    |
|marie webb    |2    |
|paul holman   |2    |
+--------------+-----+
only showing top 5 rows



In [None]:
# Calculate h-index


In [13]:
# Conferences for a given topic
sel_topic = "computer science"
testing2 = testing1.filter(F.array_contains("topics",sel_topic)).select("id", \
    "confname","type", 'conferenceseriesid', 'confname', 'confplace', 'confseries', 'confseriesname') \
    .filter("confname is not null")


testing3 = testing2.groupBy("confseriesname").count().sort("count",ascending=False) # confseries for a given topic
testing3 = testing2.groupBy("confname").count().sort("count",ascending=False)
testing3.show()


+----------------+-----+
|        confname|count|
+----------------+-----+
|       icml 2019|   35|
|      naacl 2019|   32|
|       iclr 2020|   26|
|      ijcai 2019|   26|
|       iclr 2019|   25|
|       cvpr 2019|   25|
|       aaai 2020|   23|
|      emnlp 2018|   21|
|      ijcai 2018|   20|
|      naacl 2018|   19|
|       aaai 2018|   16|
|     coling 2018|   16|
|     ijcnlp 2019|   15|
|        acl 2019|   15|
|       aaai 2019|   14|
|        acl 2018|   14|
|       lrec 2018|   13|
|interspeech 2018|   13|
|interspeech 2019|   11|
|       iccv 2019|   10|
+----------------+-----+
only showing top 20 rows



In [14]:
# Citations
sel_topic = "multimedia"
testing2 = testing1.filter(F.array_contains("topics",sel_topic)).select("id",explode("industrial_sectors").alias("industrial_sector")).filter("industrial_sectors is not null")
testing2.show(10,False)



+----------+----------------------+
|id        |industrial_sector     |
+----------+----------------------+
|2281235878|computing_and_it      |
|2281235878|technology            |
|2281235878|information_technology|
|2267368397|home_appliances       |
|2267368397|technology            |
|2267368397|electronics           |
|2311189127|electronics           |
|2311189127|technology            |
|2309667953|electronics           |
|2309667953|technology            |
+----------+----------------------+
only showing top 10 rows

