In [1]:
%%init_spark
launcher.jars = ["/opt/jar/spark-xml_2.12-0.9.0.jar"]

In [2]:
import org.apache.spark.sql.SparkSession

val spark= SparkSession
    .builder()
    .appName("graphX")
    .getOrCreate()

Intitializing Scala interpreter ...

Spark Web UI available at http://0cc13c47e22b:4040
SparkContext available as 'sc' (version = 3.0.1, master = spark://spark-master:7077, app id = app-20210227104443-0000)
SparkSession available as 'spark'


import org.apache.spark.sql.SparkSession
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@281bba07


In [3]:
import com.databricks.spark.xml._

val rawXML = spark.read.option("rowTag", "MedlineCitation").xml("hdfs://namenode:8020/medline/*.xml")

import com.databricks.spark.xml._
rawXML: org.apache.spark.sql.DataFrame = [Article: struct<Abstract: struct<AbstractText: array<struct<_Label:string,_NlmCategory:string,_VALUE:string>>, CopyrightInformation: string>, ArticleDate: struct<Day: bigint, Month: bigint ... 2 more fields> ... 11 more fields>, ChemicalList: struct<Chemical: array<struct<NameOfSubstance:struct<_UI:string,_VALUE:string>,RegistryNumber:string>>> ... 22 more fields]


In [4]:
rawXML.printSchema()

root
 |-- Article: struct (nullable = true)
 |    |-- Abstract: struct (nullable = true)
 |    |    |-- AbstractText: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- _Label: string (nullable = true)
 |    |    |    |    |-- _NlmCategory: string (nullable = true)
 |    |    |    |    |-- _VALUE: string (nullable = true)
 |    |    |-- CopyrightInformation: string (nullable = true)
 |    |-- ArticleDate: struct (nullable = true)
 |    |    |-- Day: long (nullable = true)
 |    |    |-- Month: long (nullable = true)
 |    |    |-- Year: long (nullable = true)
 |    |    |-- _DateType: string (nullable = true)
 |    |-- ArticleTitle: string (nullable = true)
 |    |-- AuthorList: struct (nullable = true)
 |    |    |-- Author: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- AffiliationInfo: struct (nullable = true)
 |    |    |    |    |    |-- Affiliation: string (nullable

In [5]:
import spark.implicits._
val meshHeadlingList = rawXML.select("MeshHeadingList.MeshHeading")
meshHeadlingList.printSchema()

root
 |-- MeshHeading: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- DescriptorName: struct (nullable = true)
 |    |    |    |-- _MajorTopicYN: string (nullable = true)
 |    |    |    |-- _Type: string (nullable = true)
 |    |    |    |-- _UI: string (nullable = true)
 |    |    |    |-- _VALUE: string (nullable = true)
 |    |    |-- QualifierName: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- _MajorTopicYN: string (nullable = true)
 |    |    |    |    |-- _UI: string (nullable = true)
 |    |    |    |    |-- _VALUE: string (nullable = true)



import spark.implicits._
meshHeadlingList: org.apache.spark.sql.DataFrame = [MeshHeading: array<struct<DescriptorName:struct<_MajorTopicYN:string,_Type:string,_UI:string,_VALUE:string>,QualifierName:array<struct<_MajorTopicYN:string,_UI:string,_VALUE:string>>>>]


In [6]:
val MeshHeadlingElems = meshHeadlingList.withColumn("data", explode($"MeshHeading")).select("data")
MeshHeadlingElems.printSchema()

root
 |-- data: struct (nullable = true)
 |    |-- DescriptorName: struct (nullable = true)
 |    |    |-- _MajorTopicYN: string (nullable = true)
 |    |    |-- _Type: string (nullable = true)
 |    |    |-- _UI: string (nullable = true)
 |    |    |-- _VALUE: string (nullable = true)
 |    |-- QualifierName: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- _MajorTopicYN: string (nullable = true)
 |    |    |    |-- _UI: string (nullable = true)
 |    |    |    |-- _VALUE: string (nullable = true)



MeshHeadlingElems: org.apache.spark.sql.DataFrame = [data: struct<DescriptorName: struct<_MajorTopicYN: string, _Type: string ... 2 more fields>, QualifierName: array<struct<_MajorTopicYN:string,_UI:string,_VALUE:string>>>]


In [7]:
val descriptorName = MeshHeadlingElems.select(MeshHeadlingElems.col("data.DescriptorName"))
descriptorName.printSchema()

root
 |-- DescriptorName: struct (nullable = true)
 |    |-- _MajorTopicYN: string (nullable = true)
 |    |-- _Type: string (nullable = true)
 |    |-- _UI: string (nullable = true)
 |    |-- _VALUE: string (nullable = true)



descriptorName: org.apache.spark.sql.DataFrame = [DescriptorName: struct<_MajorTopicYN: string, _Type: string ... 2 more fields>]


In [8]:
val parsedDF = descriptorName.select(descriptorName.col("DescriptorName._MajorTopicYN"),
                                    descriptorName.col("DescriptorName._VALUE").as("topic"))
parsedDF.show

+-------------+--------------------+
|_MajorTopicYN|               topic|
+-------------+--------------------+
|            N|            Behavior|
|            N|Congenital Abnorm...|
|            N|    Disabled Persons|
|            N|             Disease|
|            Y|Intellectual Disa...|
|            N|        Intelligence|
|            Y|Maternal-Fetal Ex...|
|            N|         Personality|
|            N|           Pregnancy|
|            Y|Pregnancy Complic...|
|            N|          Psychology|
|            N|        Reproduction|
|            N|            Research|
|            N|       Contraception|
|            N|Contraceptive Agents|
|            N|Family Planning S...|
|            Y|           Pessaries|
|            N|        Reproduction|
|            Y|     Sperm Transport|
|            Y|Spermatocidal Agents|
+-------------+--------------------+
only showing top 20 rows



parsedDF: org.apache.spark.sql.DataFrame = [_MajorTopicYN: string, topic: string]


In [9]:
var majorTopic = parsedDF.filter(col("_MajorTopicYN") === "Y")
majorTopic = majorTopic.withColumn("topic", regexp_replace(majorTopic("topic"), ", ", ","))
majorTopic.show

+-------------+--------------------+
|_MajorTopicYN|               topic|
+-------------+--------------------+
|            Y|Intellectual Disa...|
|            Y|Maternal-Fetal Ex...|
|            Y|Pregnancy Complic...|
|            Y|           Pessaries|
|            Y|     Sperm Transport|
|            Y|Spermatocidal Agents|
|            Y|Vaginal Creams,Fo...|
|            Y|       Amniocentesis|
|            Y|            Research|
|            Y|    Cesarean Section|
|            Y|     General Surgery|
|            Y|        Hysterectomy|
|            Y|Retrospective Stu...|
|            Y|Sterilization,Rep...|
|            Y|  Animals,Laboratory|
|            Y|            Eugenics|
|            Y|           Aftercare|
|            Y|          Anesthesia|
|            Y|    Cesarean Section|
|            Y|       Contraception|
+-------------+--------------------+
only showing top 20 rows



majorTopic: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [_MajorTopicYN: string, topic: string]
majorTopic: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [_MajorTopicYN: string, topic: string]


# MeSH 주요 주제와 주제들의 동시발생 분석하기

In [10]:
val topicDist = majorTopic.groupBy("topic").count()
topicDist.orderBy(desc("count")).show

+--------------------+-----+
|               topic|count|
+--------------------+-----+
|            Research| 1649|
|             Disease| 1349|
|           Neoplasms| 1123|
|        Tuberculosis| 1066|
|       Public Policy|  816|
|       Jurisprudence|  796|
|          Demography|  763|
| Population Dynamics|  753|
|           Economics|  690|
|            Medicine|  682|
|Socioeconomic Fac...|  655|
|               Blood|  631|
|            Politics|  631|
|Emigration and Im...|  601|
|       Social Change|  577|
|          Physicians|  560|
|            Mutation|  542|
|    Abortion,Induced|  503|
|          Anesthesia|  483|
|       Public Health|  479|
+--------------------+-----+
only showing top 20 rows



topicDist: org.apache.spark.sql.DataFrame = [topic: string, count: bigint]


In [11]:
val topics = majorTopic.select("topic").rdd.map(el => el.getString(0).split(",").toList)
val onlyTopics =  topics.flatMap(mesh => mesh).toDF("topic")

topics: org.apache.spark.rdd.RDD[List[String]] = MapPartitionsRDD[41] at map at <console>:35
onlyTopics: org.apache.spark.sql.DataFrame = [topic: string]


## 문자열들의 목록에서 문자열 2개로 구성된 모든 부분 집합 만들기
원소가 다르게 정렬되어 있는 리스트를 다른 리스트로 보기 때문에    
미리 정렬을 해줘야한다. 그 뒤 scala의 서브 리스트를 만드는 combinations 메소드를 사용한다.

In [12]:
val topicPairs = topics.flatMap(t => {t.sorted.combinations(2)}).toDF("pairs") 
topicPairs.createOrReplaceTempView("topic_pairs")
val cooccurs = spark.sql("""
    SELECT pairs, COUNT(*) cnt
    FROM topic_pairs
    GROUP BY pairs""")
cooccurs.show

+--------------------+---+
|               pairs|cnt|
+--------------------+---+
|[Closed, Ecologic...| 31|
|[Hypertension, Pu...| 18|
|[Halogenated, Hyd...|  2|
|[Ductus Arteriosu...| 11|
|[Hypotension, Ort...|  5|
|    [Biopsy, Needle]| 49|
|[Intraocular, Len...| 46|
|  [Artificial, Skin]| 12|
|[Booksellers', Ca...|  1|
|[Epistasis, Genetic]|  3|
|[Denture, Resin-B...|  1|
|[Comminuted, Frac...|  2|
|  [Medical, Schools]| 84|
|[Carcinoma, Renal...|  7|
|[Epilepsies, Myoc...|  6|
|[Congenital, Hip ...| 10|
|[Gingivitis, Necr...|  8|
|[Chemistry, Clini...|  8|
|[Informal, Social...| 46|
|     [Genes, Lethal]| 15|
+--------------------+---+
only showing top 20 rows



topicPairs: org.apache.spark.sql.DataFrame = [pairs: array<string>]
cooccurs: org.apache.spark.sql.DataFrame = [pairs: array<string>, cnt: bigint]


In [13]:
cooccurs.createOrReplaceTempView("cooccurs")
spark.sql("""
    SELECT pairs, cnt
    FROM cooccurs
    ORDER BY cnt DESC
    LIMIT 20""").collect().foreach(println)

[WrappedArray(Abortion, Induced),503]
[WrappedArray(Biological, Models),471]
[WrappedArray(Education, Medical),467]
[WrappedArray(Chromosomes, Human),404]
[WrappedArray(Infant, Newborn),399]
[WrappedArray(Models, Theoretical),387]
[WrappedArray(Formal, Social Control),348]
[WrappedArray(Attitudes, Health Knowledge),322]
[WrappedArray(Attitudes, Practice),322]
[WrappedArray(Health Knowledge, Practice),322]
[WrappedArray(Antibiotics, Antitubercular),314]
[WrappedArray(Tomography, X-Ray Computed),312]
[WrappedArray(Operative, Surgical Procedures),305]
[WrappedArray(Education, Nursing),260]
[WrappedArray(Ethics, Medical),257]
[WrappedArray(Diseases, Infant),256]
[WrappedArray(Genetic, Transcription),254]
[WrappedArray(Diagnosis, Differential),248]
[WrappedArray(Medicinal, Plants),237]
[WrappedArray(Bone, Fractures),229]


## GraphX로 동시발생 네트워크 구성하기
GraphX는 두 개의 특화된 RDD를 사용해서 그래프를 생성한다
### VertextRDD[VD]는 RDD[(VertexId, VD)]의 구현
    64 bit Long의 Key와 Value
### EdgeRDD[ED]는 RDD[(VertexID, ED)]의 구현
    srcVertexId와 dstVertexId와 Value

## MD5 해시 알고리즘을 사용해서 Vertex에 64bit Key 부여

In [14]:
import java.nio.charset.StandardCharsets
import java.security.MessageDigest

def hashID(str: String): Long = {
    val bytes = MessageDigest.getInstance("MD5").digest(str.getBytes(StandardCharsets.UTF_8))
    (bytes(0) & 0xFFL) |
    ((bytes(1) & 0xFFL) << 8)  |
    ((bytes(2) & 0xFFL) << 16) |
    ((bytes(3) & 0xFFL) << 24) | 
    ((bytes(4) & 0xFFL) << 32) |
    ((bytes(5) & 0xFFL) << 40) |
    ((bytes(6) & 0xFFL) << 48) |
    ((bytes(7) & 0xFFL) << 56)
}

import java.nio.charset.StandardCharsets
import java.security.MessageDigest
hashID: (str: String)Long


## VertexRDD - 64 bit Long의 Key와 Value

In [15]:
import org.apache.spark.sql.Row

val vertices = onlyTopics.map{ case Row(topic: String) => (hashID(topic), topic) }.toDF("hash", "topic")
vertices.show(false)

+--------------------+-----------------------+
|hash                |topic                  |
+--------------------+-----------------------+
|5187539882274411027 |Intellectual Disability|
|7326766905191375254 |Maternal-Fetal Exchange|
|-8335899805523560725|Pregnancy Complications|
|6668370814094416392 |Pessaries              |
|2165268630633201617 |Sperm Transport        |
|7410582300145776325 |Spermatocidal Agents   |
|-2715598449643586692|Vaginal Creams         |
|-5490501322948777906|Foams                  |
|6797329277563646197 |and Jellies            |
|-8710082263013033965|Amniocentesis          |
|5873630755945588720 |Research               |
|-6195495891807228194|Cesarean Section       |
|-4584508663902919400|General Surgery        |
|-6313877427211760081|Hysterectomy           |
|3931245851669231450 |Retrospective Studies  |
|7711775517543139760 |Sterilization          |
|-5551980622857818596|Reproductive           |
|6222546531752250163 |Animals                |
|273296691941

import org.apache.spark.sql.Row
vertices: org.apache.spark.sql.DataFrame = [hash: bigint, topic: string]


## EdgeRDD - srcVertexId와 dstVertexId와 Value

In [16]:
import org.apache.spark.graphx._

val edges = cooccurs.map{ case Row(pairs: Seq[_], cnt: Long) =>
    val ids = pairs.map(_.toString).map(hashID).sorted
    Edge(ids(0), ids(1), cnt)
}

import org.apache.spark.graphx._
edges: org.apache.spark.sql.Dataset[org.apache.spark.graphx.Edge[Long]] = [srcId: bigint, dstId: bigint ... 1 more field]


In [17]:
val vertexRDD = vertices.rdd.map{
    case Row(hash: Long, topic: String) => (hash, topic)
}
val topicGraph = Graph(vertexRDD, edges.rdd)
topicGraph.cache()

vertexRDD: org.apache.spark.rdd.RDD[(Long, String)] = MapPartitionsRDD[65] at map at <console>:43
topicGraph: org.apache.spark.graphx.Graph[String,Long] = org.apache.spark.graphx.impl.GraphImpl@1d575146
res10: org.apache.spark.graphx.Graph[String,Long] = org.apache.spark.graphx.impl.GraphImpl@1d575146


https://spark.apache.org/docs/latest/img/property_graph.png

## 모든 Vertex에서 다른 모든 Vertex로 이어지는 Path를 가지는 Connected Subgraph를 생성
https://drek4537l1klr.cloudfront.net/bonaci/Figures/09fig03.jpg

In [18]:
val connectedComponentGraph = topicGraph.connectedComponents()

connectedComponentGraph: org.apache.spark.graphx.Graph[org.apache.spark.graphx.VertexId,Long] = org.apache.spark.graphx.impl.GraphImpl@f30442b


In [19]:
val componentDF = connectedComponentGraph.vertices.toDF("vid", "cid")
componentDF.show(false)

+--------------------+--------------------+
|vid                 |cid                 |
+--------------------+--------------------+
|-7316262928265344779|-7316262928265344779|
|6081464828435935967 |6081464828435935967 |
|-3298299725015287104|-3298299725015287104|
|1790883385483333827 |1790883385483333827 |
|6371846346169607292 |6371846346169607292 |
|9083303858128127359 |-9215470674759766104|
|-2892570574836872675|-2892570574836872675|
|4144959121306352236 |4144959121306352236 |
|-2256043373786658093|-6731727939376452220|
|7171990309415822392 |7171990309415822392 |
|-7567449790538207579|-7567449790538207579|
|-6856105093952105058|-6856105093952105058|
|2277126264016938007 |-9146446762266834735|
|-4510394959788367402|-4510394959788367402|
|2026738476704047088 |2026738476704047088 |
|-5074639714252992290|-5074639714252992290|
|-3670369270656251168|-3670369270656251168|
|3684435136196876397 |3684435136196876397 |
|-265354314421783823 |-9215470674759766104|
|8355045401803691586 |8355045401

componentDF: org.apache.spark.sql.DataFrame = [vid: bigint, cid: bigint]


In [20]:
val componentCounts = componentDF.groupBy("cid").count()
componentCounts.count()

componentCounts: org.apache.spark.sql.DataFrame = [cid: bigint, count: bigint]
res12: Long = 12185


## 각 연결 성분의 크기를 구해본다

In [21]:
componentCounts.orderBy(desc("count")).show

+--------------------+-----+
|                 cid|count|
+--------------------+-----+
|-9215470674759766104| 1229|
|-8958016315901741476|   43|
|-8534530815268459613|   31|
|-8525945984722225322|   27|
|-8433229734442232152|   23|
|-5431966423110682938|   13|
|-6457402890108996741|   13|
|-8778799276582343892|   12|
|-8952683923920091838|   12|
|-2504097438637703249|   11|
|-7654163459406679088|    9|
|-7101834924453003464|    9|
|-8900385585274452121|    8|
|-9117649955724002678|    8|
|-7694471437534469153|    8|
|-2234424701633221593|    7|
|-2442449324161724517|    7|
|-7763756269967324984|    7|
|-4801324290586934533|    7|
|-8077949023993713521|    7|
+--------------------+-----+
only showing top 20 rows



## cid count가 상위 7번째(13) 따리를 조회

In [22]:
val testGraphConnect = componentDF.filter(col("cid") === "-5431966423110682938")
testGraphConnect.show

+--------------------+--------------------+
|                 vid|                 cid|
+--------------------+--------------------+
| 8084519340451788661|-5431966423110682938|
| 7095041754108558476|-5431966423110682938|
| 7839629013181357020|-5431966423110682938|
|-5120990266664752595|-5431966423110682938|
|-1228431817674920367|-5431966423110682938|
|-5431966423110682938|-5431966423110682938|
| 3312266615297311284|-5431966423110682938|
| 8179512113853959277|-5431966423110682938|
|-4064173520721180199|-5431966423110682938|
| 3497129956290009190|-5431966423110682938|
| 4664620325710429792|-5431966423110682938|
| 6775005617735922441|-5431966423110682938|
|   28792762178501292|-5431966423110682938|
+--------------------+--------------------+



testGraphConnect: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [vid: bigint, cid: bigint]


## Topic 을 확인하기 위해 Join

In [23]:
val joinExp = vertices.col("hash") === testGraphConnect.col("vid")
val joinWithVertexName = testGraphConnect.join(vertices, joinExp).distinct()

joinWithVertexName.orderBy(col("topic")).show

+--------------------+--------------------+--------------------+--------------------+
|                 vid|                 cid|                hash|               topic|
+--------------------+--------------------+--------------------+--------------------+
| 8084519340451788661|-5431966423110682938| 8084519340451788661|         Atherogenic|
| 7839629013181357020|-5431966423110682938| 7839629013181357020|Carbohydrate-Rest...|
|   28792762178501292|-5431966423110682938|   28792762178501292|          Cariogenic|
|-4064173520721180199|-5431966423110682938|-4064173520721180199|            Diabetic|
| 3497129956290009190|-5431966423110682938| 3497129956290009190|                Diet|
| 4664620325710429792|-5431966423110682938| 4664620325710429792|      Fat-Restricted|
| 3312266615297311284|-5431966423110682938| 3312266615297311284|         Gluten-Free|
| 8179512113853959277|-5431966423110682938| 8179512113853959277|            High-Fat|
| 7095041754108558476|-5431966423110682938| 7095041754

joinExp: org.apache.spark.sql.Column = (hash = vid)
joinWithVertexName: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [vid: bigint, cid: bigint ... 2 more fields]


## Diet와 유사한 주제 중 Edge에 포함되지 않은 주제는?

In [24]:
topicDist.filter($"topic".contains("Diet")).show

+--------------------+-----+
|               topic|count|
+--------------------+-----+
|    Dietary Services|    2|
|Diet,Protein-Rest...|    6|
|        Diet Therapy|   40|
|           Diet Fads|    5|
|     Calcium,Dietary|   60|
|       Diet,High-Fat|    4|
|     Diet,Vegetarian|    6|
|Diet,Carbohydrate...|    1|
| Cholesterol,Dietary|    6|
|      Sodium,Dietary|   40|
|       Dietary Fiber|   13|
|     Dietary Sucrose|    1|
|       Diethylamines|    1|
|           Dietetics|   42|
|    Diet,Gluten-Free|    2|
|        Diet Surveys|   13|
|Recommended Dieta...|    2|
|    Dietary Proteins|   46|
|  Diethylnitrosamine|    4|
|Dietary Carbohydr...|   28|
+--------------------+-----+
only showing top 20 rows



In [25]:
topicDist.filter($"topic".contains("Demography")).show

+----------+-----+
|     topic|count|
+----------+-----+
|Demography|  763|
+----------+-----+



## 차수( 각 Vertex에 연결된 Edge의 수)의 분포

In [26]:
val degrees: VertexRDD[Int] = topicGraph.degrees.cache()

degrees: org.apache.spark.graphx.VertexRDD[Int] = VertexRDDImpl[373] at RDD at VertexRDD.scala:57


## degrees 메소드로 반환한 그래프 각 Vertex의 차수

In [27]:
val testVertexDegree = degrees.toDF("vertexID", "degree")

testVertexDegree.where(col("vertexID") === "-5431966423110682938").show
println(testVertexDegree.count)

+--------------------+------+
|            vertexID|degree|
+--------------------+------+
|-5431966423110682938|     1|
+--------------------+------+

2352


testVertexDegree: org.apache.spark.sql.DataFrame = [vertexID: bigint, degree: int]


## 차수 분포의 기본적인 요약 통계

In [28]:
testVertexDegree.describe("degree").show

+-------+------------------+
|summary|            degree|
+-------+------------------+
|  count|              2352|
|   mean|1.9838435374149659|
| stddev| 3.238130572112704|
|    min|                 1|
|    max|                61|
+-------+------------------+



In [29]:
val topicGraphVerteciesCount = topicGraph.vertices.count()

topicGraphVerteciesCount: Long = 14174


In [30]:
val singleTopics = topics.filter(x => x.size == 1)
singleTopics.count()

singleTopics: org.apache.spark.rdd.RDD[List[String]] = MapPartitionsRDD[390] at filter at <console>:41
res20: Long = 245292


## 전체 문서 중에 외톨이 주제의 수

In [31]:
val singleTopicsDistinct = singleTopics.flatMap(topic => topic).distinct().toDS()
singleTopicsDistinct.count()

singleTopicsDistinct: org.apache.spark.sql.Dataset[String] = [value: string]
res21: Long = 12389


## topicPairs에 이미 포함되어 있는 것을 except 한 아무 것도 연결되어 있지 않은 Topic의 수 

In [32]:
val singleTopicInPairs = topicPairs.flatMap(_.getAs[Seq[String]](0))
singleTopicsDistinct.except(singleTopicInPairs).count()

singleTopicInPairs: org.apache.spark.sql.Dataset[String] = [value: string]
res22: Long = 11822


## 그래프의 총 Vertex 수 = degrees RDD의 원소 수 + 외톨이 Topic 수 

In [33]:
topicGraphVerteciesCount == (testVertexDegree.select("degree").count() + singleTopicsDistinct.except(singleTopicInPairs).count())

res23: Boolean = true


In [34]:
val namesAndDegrees = degrees.innerJoin(topicGraph.vertices) {
    (topicId, degree, name) => (name, degree.toInt)
}.values.toDF("topic", "degree")

namesAndDegrees: org.apache.spark.sql.DataFrame = [topic: string, degree: int]


## Degree가 높은 Topic을 뽑아낸다

In [35]:
namesAndDegrees.orderBy(desc("degree")).show

+------------+------+
|       topic|degree|
+------------+------+
|       Genes|    61|
|       Human|    57|
|   Receptors|    40|
|    Hospital|    38|
| Chromosomes|    38|
|      Dental|    35|
|     Medical|    25|
|    Antigens|    25|
|     Genetic|    25|
|   Hospitals|    24|
|   Bacterial|    23|
|Tuberculosis|    23|
|       Viral|    23|
|      Animal|    23|
|   Carcinoma|    21|
|     Nursing|    21|
|  Artificial|    21|
|         RNA|    19|
|   Education|    18|
|      Models|    18|
+------------+------+
only showing top 20 rows



## 관련성이 낮은 관계를 필터링하기 : Chi-Squared Test
### 전체 문서의 수

In [37]:
val T = majorTopic.count() 
sc.broadcast(T)

T: Long = 280464
res25: org.apache.spark.broadcast.Broadcast[Long] = Broadcast(125)


### RDD로 만드는 해쉬 값과 Count

In [38]:
val topicDistRdd = topicDist.map{
    case Row(topic: String, count: Long) => (hashID(topic), count)
}.rdd

topicDistRdd: org.apache.spark.rdd.RDD[(Long, Long)] = MapPartitionsRDD[482] at rdd at <console>:43


In [39]:
val topicDistGraph = Graph(topicDistRdd, topicGraph.edges)

topicDistGraph: org.apache.spark.graphx.Graph[Long,Long] = org.apache.spark.graphx.impl.GraphImpl@5562fa59


In [40]:
def chiSq(YY: Long, YB: Long, YA: Long, T: Long): Double = {
    val NB = T - YB // B가 나오지 않음
    val NA = T - YA // A가 나오지 않음
    val YN = YA - YY // A 나오고 B 나오지 않음
    val NY = YB - YY // A 나오지 않고 B 나옴
    val NN = T - NY - YN - YY // A 와 B 모두 나오지 않음
    val inner = math.abs(YY * NN - YN * NY) - T / 2.0 // 카이제곱 통계량 1
    T * math.pow(inner, 2) / (YA * NA * YB * NB) // 카이제곱 통계량 2
}

chiSq: (YY: Long, YB: Long, YA: Long, T: Long)Double


## EdgeTriplet 자료 구조 ( srcVertexID, Attr , DstVertexID)
https://spark.apache.org/docs/latest/api/scala/org/apache/spark/graphx/EdgeTriplet.html

In [41]:
val topicDistGraphTriplet = topicDistGraph.triplets.map(triplet => 
    (triplet.srcAttr, triplet.srcId, triplet.attr, triplet.dstId, triplet.dstAttr))
    .toDF("srcId", "srcAttr",  "attr", "dstId", "dstAttr")
topicDistGraphTriplet.show

+-----+--------------------+----+--------------------+-------+
|srcId|             srcAttr|attr|               dstId|dstAttr|
+-----+--------------------+----+--------------------+-------+
|    0|-9182438712389191341|   5| -389298284109391318|     54|
|    0|-7872097114296777293|   2| 1202294856466702536|     16|
|    0|-7149230680597744952|   2| 5589473318573193898|      0|
|    0|-6680067852188841900|   1| 7818223270735667332|      0|
|  183|-6279404004170606079|  18|-6078785589870891357|      0|
|    0|-5774599100051916967|   3| 6136359576808376643|      0|
|    0|-3918893881484148728|   1| 4545427657671254648|      0|
|   15|-1512285294743677509|  46| 1023245654305693193|      0|
|  114|-1191649091021856552|  49| 2207317118408894164|      0|
|    0| 2125035285081874317|  12| 3011389593876829075|    180|
|    0| 2832577980852704136|  31| 8607497299412972547|      0|
|   20| 3409758096235920035|  11| 6323290579067087369|      0|
|    0|-7957937824582824477|  15|-4129130605120804884| 

topicDistGraphTriplet: org.apache.spark.sql.DataFrame = [srcId: bigint, srcAttr: bigint ... 3 more fields]


## Triplet의 값을 사용해서 Chi-Square Test 결과를 Edge로 하는 새로운 그래프를 반환한다.

In [42]:
val chiSquaredGraph = topicDistGraph.mapTriplets(triplet => {
    chiSq(triplet.attr, triplet.srcAttr, triplet.dstAttr, T)
})
chiSquaredGraph.edges.map(x => x.attr).stats()

chiSquaredGraph: org.apache.spark.graphx.Graph[Long,Double] = org.apache.spark.graphx.impl.GraphImpl@d461e68
res27: org.apache.spark.util.StatCounter = (count: 2333, mean: NaN, stdev: NaN, max: Infinity, min: 0.355924)


## ChiSquared 결과가 극단적인 19.5 이상으로 상대적으로 의미 없는 Edge를 제거한다.

In [43]:
val interesting = chiSquaredGraph.subgraph(
    triplet => triplet.attr > 19.5)
interesting.edges.count  // 0 - 2333, 19.5 2326

interesting: org.apache.spark.graphx.Graph[Long,Double] = org.apache.spark.graphx.impl.GraphImpl@71d8ecfe
res28: Long = 2326


## Edge가 필터링된 그래프를 분석한다

In [44]:
val interestingComponentGraph = interesting.connectedComponents()
val icDF = interestingComponentGraph.vertices.toDF("vid", "cid")
val icCountDF = icDF.groupBy("cid").count()
icCountDF.count() // Edge의 수 

interestingComponentGraph: org.apache.spark.graphx.Graph[org.apache.spark.graphx.VertexId,Double] = org.apache.spark.graphx.impl.GraphImpl@38d14997
icDF: org.apache.spark.sql.DataFrame = [vid: bigint, cid: bigint]
icCountDF: org.apache.spark.sql.DataFrame = [cid: bigint, count: bigint]
res29: Long = 14346


## Edge의 크기

In [45]:
icCountDF.orderBy(desc("count")).show

+--------------------+-----+
|                 cid|count|
+--------------------+-----+
|-9215470674759766104| 1213|
|-8958016315901741476|   43|
|-8534530815268459613|   31|
|-8525945984722225322|   27|
|-8433229734442232152|   23|
|-9195187390356502787|   15|
|-5431966423110682938|   13|
|-6457402890108996741|   13|
|-8952683923920091838|   12|
|-8778799276582343892|   12|
|-2504097438637703249|   11|
|-7101834924453003464|    9|
|-7654163459406679088|    9|
|-8900385585274452121|    8|
|-9117649955724002678|    8|
|-7694471437534469153|    8|
|-2234424701633221593|    7|
|-7763756269967324984|    7|
|-4801324290586934533|    7|
|-8077949023993713521|    7|
+--------------------+-----+
only showing top 20 rows



## 필터링 된 차수의 크기

In [46]:
val interestingDegrees = interesting.degrees.cache()
interestingDegrees.map(_._2).stats()

interestingDegrees: org.apache.spark.graphx.VertexRDD[Int] = VertexRDDImpl[757] at RDD at VertexRDD.scala:57
res31: org.apache.spark.util.StatCounter = (count: 2351, mean: 1.978732, stdev: 3.228102, max: 61.000000, min: 1.000000)


## 필터링 된 그래프에서의 Topic과 Degree

In [47]:
interestingDegrees.innerJoin(topicGraph.vertices) {
    (topicId, degree, name) => (name, degree)
}.values.toDF("topic", "degree").orderBy(desc("degree")).show

+------------+------+
|       topic|degree|
+------------+------+
|       Genes|    61|
|       Human|    57|
|   Receptors|    40|
|    Hospital|    38|
| Chromosomes|    38|
|      Dental|    35|
|     Genetic|    25|
|    Antigens|    25|
|     Medical|    25|
|   Hospitals|    24|
|      Animal|    23|
|       Viral|    23|
|   Bacterial|    23|
|Tuberculosis|    22|
|  Artificial|    21|
|     Nursing|    21|
|   Carcinoma|    20|
|         RNA|    19|
|   Education|    18|
|      Models|    18|
+------------+------+
only showing top 20 rows



## 고급 그래프 속성 몇가지를 계산
    Collective Dynamics of 'Small-world Networks'
    완전 그래프 - 그래프에서 서로 다른 모든 Vertex가 반드시 연결되어 있는 그래프
    
    어떤 그래프가 완전 부분그래프(Clique)를 가지는 것을 찾는것이 NP-Complete Prob
    간접적으로 Triangle Count( Vertex 세개로 이루어진 완전 그래프)를 이용한다.
    Triangle Count를 사용하는 지역 군집 계수 ( Local Clustering Coefficient )
    식 = 실제 존재하는 트라이 앵글 수  /  만들 수 있는 전체 트라이앵글 수


## 각 Vertex의 트라이앵글 수를 가진 그래프를 반환

In [48]:
val triCountGraph = interesting.triangleCount()
triCountGraph.vertices.map(x => x._2).stats()

triCountGraph: org.apache.spark.graphx.Graph[Int,Double] = org.apache.spark.graphx.impl.GraphImpl@2ffad3b2
res33: org.apache.spark.util.StatCounter = (count: 16333, mean: 0.029756, stdev: 0.464996, max: 34.000000, min: 0.000000)


## 한 Vertex의 전체 트라이앵글 수

In [49]:
val maxTrisGraph = interestingDegrees.mapValues(d => d * (d-1) / 2.0)

maxTrisGraph: org.apache.spark.graphx.VertexRDD[Double] = VertexRDDImpl[821] at RDD at VertexRDD.scala:57


## 두 그래프를 조인해서  지역 군집계수를 구한다

In [50]:
val clusterCoef = triCountGraph.vertices.innerJoin(maxTrisGraph) { 
    (vertexId, triCount, maxTris) => {if (maxTris == 0) 0 else triCount / maxTris}
}

clusterCoef: org.apache.spark.graphx.VertexRDD[Double] = VertexRDDImpl[823] at RDD at VertexRDD.scala:57


## 네트워크의 평균 군집 계수 

In [51]:
clusterCoef.map(_._2).sum() / interesting.vertices.count()

res34: Double = 0.009933650080270988


## Pregel을 사용해서 평균 경로 길이 계산하기
    Vertex간 경로(Path)의 길이를 구하기
~~~
        for
            => 각 꼭짓점과 그 꼭짓점 까지의 거리의 목록을 만든다.
            => 아웃 노드들에 그들이 보유한 목록을 질의한다.
            => 자신에게 없는 꼭짓점 까지의 거리를 추가 갱신한다.
        end 더이상 추가할 수 없을 때 까지 반복한다.
~~~
    Pregel 은 Computation(계산)과 Communication(통신) 두 단계로 나누어 병렬 프로그래밍

        Computation : 각 Vertex에서 내부 상태를 검사해 다른 Vertex에 'Message'를 보낼 것을 정한다.
        Communication : 이전 통신 단계의 결과로 나온 메시지를 적당한 Vertex로 보낼 수 있도록 경로를 지정

    Pregel API 를 사용할 때 구현해야할 함수
        1. 각 Vertex의 상태를 추적하는 함수 ( 어떤 상태를 추적할 건지 )
        2. 이웃하는 Vertex의 각 쌍을 평가한 후, 다음 단계에 보낼 함수를 정하는 함수
        3. 받은 메시지 들을 통합해서 자신(Vertex)의 상태를 업데이트 함수
    

## 평균 경로 길이 문제 
    각 Vertex의 상태 : 알려진 다른 ( VertexId, 거리 ) -> Map[VertexId, Int] ( 룩업 테이블)
    각 Vertex에 전달할 메시지 : ( VertexId, 거리 ) -> Map[VertexId, Int] ( 룩업 테이블 )


## 도착한 메시지 정보를 Vertexd의 상태에 병합하는 함수

In [52]:
def mergeMaps(m1: Map[VertexId, Int], m2: Map[VertexId, Int]) : Map[VertexId, Int] = {
    def minThatExists(k: VertexId): Int = {
        math.min(m1.getOrElse(k, Int.MaxValue), m2.getOrElse(k, Int.MaxValue))
    }
    
    (m1.keySet ++ m2.keySet).map(k => (k, minThatExists(k))).toMap // 동시에 나타는 VertexId에 대해서 더 작은 값을 보관한다
}

mergeMaps: (m1: Map[org.apache.spark.graphx.VertexId,Int], m2: Map[org.apache.spark.graphx.VertexId,Int])Map[org.apache.spark.graphx.VertexId,Int]


## Vertex 업데이트 함수

In [53]:
def update(id: VertexId, state: Map[VertexId, Int], msg: Map[VertexId, Int]) = {
    mergeMaps(state, msg)
}

update: (id: org.apache.spark.graphx.VertexId, state: Map[org.apache.spark.graphx.VertexId,Int], msg: Map[org.apache.spark.graphx.VertexId,Int])Map[org.apache.spark.graphx.VertexId,Int]


## 이웃하는 Vertex에서 받은 정보를 보고 각 Vertex에 보낼 메시지 생성 함수

In [54]:
def checkIncrement(a: Map[VertexId, Int], b: Map[VertexId, Int], bid: VertexId) = {
    val aplus = a.map { case (v, d) => v -> (d+1) } // 거리 1 증가
    if(b != mergeMaps(aplus, b)) {
        Iterator((bid, aplus)) // 이웃 Vertex의 상태와 다르다면 이웃 Vertex에 결과를 보낼 메시지 생성
    } else {
        Iterator.empty
    }
}

checkIncrement: (a: Map[org.apache.spark.graphx.VertexId,Int], b: Map[org.apache.spark.graphx.VertexId,Int], bid: org.apache.spark.graphx.VertexId)Iterator[(org.apache.spark.graphx.VertexId, scala.collection.immutable.Map[org.apache.spark.graphx.VertexId,Int])]


## src 와 dst 양 Vertex에서 메시지를 갱신 시 사용할 함수

In [55]:
def iterate(e: EdgeTriplet[Map[VertexId, Int], _]) = {
    checkIncrement(e.srcAttr, e.dstAttr, e.dstId) ++
    checkIncrement(e.dstAttr, e.srcAttr, e.srcId)
}

iterate: (e: org.apache.spark.graphx.EdgeTriplet[Map[org.apache.spark.graphx.VertexId,Int], _])Iterator[(org.apache.spark.graphx.VertexId, scala.collection.immutable.Map[org.apache.spark.graphx.VertexId,Int])]


## Vertex를 Sampling 해서 새로운 Graph를 생성

In [56]:
val fraction = 0.02
val replacement = false
val sample = interesting.vertices.map(v => v._1).sample(replacement, fraction, 1729L)
val ids = sample.collect().toSet

fraction: Double = 0.02
replacement: Boolean = false
sample: org.apache.spark.rdd.RDD[org.apache.spark.graphx.VertexId] = PartitionwiseSampledRDD[827] at sample at <console>:43
ids: scala.collection.immutable.Set[Long] = Set(-2946899861498087798, -7913216201026662864, -76428294520429966, 2499204606670907436, -3285152267756629768, -9005957016891921496, 6037965845032188647, 5668309178822636501, 3993598833166579669, 1191901616057028428, 8064563839628739696, -925832195390866027, -8174457967231783561, -1602707214362909118, 5479593410478892393, -3604279977562363738, 3126407477913472028, -7596449335273103275, 912942725752082835, -6879831100030416881, -4815080670761796817, -6266048249759281991, 5813898916814956936, 7720856311546693293, 479674692011546890, 7964233329544335291, -37787311291635943...


In [57]:
val mapGraph = interesting.mapVertices((id, _) => {
    if (ids.contains(id)) {
        Map(id -> 0)
    } else {
        Map[VertexId, Int]()
    }
})

mapGraph: org.apache.spark.graphx.Graph[scala.collection.immutable.Map[org.apache.spark.graphx.VertexId,Int],Double] = org.apache.spark.graphx.impl.GraphImpl@17d6b899


## Vertex의 메시지를 선언하고 ( 비어있음 ) pregel 메소드를 호출

In [58]:
val start = Map[VertexId, Int]()
val res = mapGraph.pregel(start)(update, iterate, mergeMaps)

start: scala.collection.immutable.Map[org.apache.spark.graphx.VertexId,Int] = Map()
res: org.apache.spark.graphx.Graph[scala.collection.immutable.Map[org.apache.spark.graphx.VertexId,Int],Double] = org.apache.spark.graphx.impl.GraphImpl@207d1065


## Path 길이를 계산한다. ( VertexId, VertexId, Int )

In [60]:
val paths = res.vertices.flatMap{ case(id, m) => // VertexId, ( VertexId, Int )
    m.map { case(k, v) => // ( VertexId, Int )
        if (id < k) {
            (id, k, v)
        } else {
            (id, k, v)
        }
    }
}.distinct()
paths.cache()

paths: org.apache.spark.rdd.RDD[(org.apache.spark.graphx.VertexId, org.apache.spark.graphx.VertexId, Int)] = MapPartitionsRDD[1158] at distinct at <console>:41
res36: paths.type = MapPartitionsRDD[1158] at distinct at <console>:41


In [61]:
val pathDF = paths.toDF("SrcVertexId", "DstVertexId", "PathLen")
pathDF.show

+--------------------+--------------------+-------+
|         SrcVertexId|         DstVertexId|PathLen|
+--------------------+--------------------+-------+
|  395415489013661543|-6064124848399142155|      8|
|-1953899191912595410|-6064124848399142155|      6|
|-7693150486571694714| 4148042476509963484|      9|
|  311563963288822342|-6475986039297157456|      7|
|-7683845844397460107|-5512369437406212956|      5|
|    9929880474613702| 3080671367692401373|      8|
| 6715451969093686253| -112645709515239560|      5|
|-3858832936894511624|-6475986039297157456|      7|
|-1295125037716930763|-1521890788461760039|      8|
|-2089196482998081636| -112645709515239560|      7|
| 6222546531752250163|-6266048249759281991|      7|
|-2233884578870028995| 8834212968550452686|      6|
| -697814771640862887| 1695642729486467421|      6|
| -231263932096822161|-7398369319326415525|     11|
| 7170544481691569991| 4637386053499667192|      8|
| -673151094204633006|-2766443787435473830|      8|
|-6729516762

pathDF: org.apache.spark.sql.DataFrame = [SrcVertexId: bigint, DstVertexId: bigint ... 1 more field]


In [62]:
pathDF.filter(col("PathLen") > 0).describe("PathLen").show()

+-------+------------------+
|summary|           PathLen|
+-------+------------------+
|  count|             26889|
|   mean| 7.103164862955111|
| stddev|2.4962618636291096|
|    min|                 1|
|    max|                18|
+-------+------------------+



In [63]:
pathDF.groupBy("PathLen").count().show

+-------+-----+
|PathLen|count|
+-------+-----+
|     12|  787|
|      1|   82|
|     13|  362|
|      6| 4261|
|     16|   26|
|      3| 1099|
|      5| 3408|
|     15|   81|
|      9| 2708|
|     17|    9|
|      4| 2150|
|      8| 4150|
|      7| 4188|
|     10| 1749|
|     11| 1194|
|     14|  140|
|      2|  494|
|      0|  321|
|     18|    1|
+-------+-----+

