# Index Wikipedia into Solr

* Assumes Wikipedia has been pre-processed into delta tables as per the notebook [wikipedia_gis_analysis_with_h3_and_deckgl.ipynb](wikipedia_gis_analysis_with_h3_and_deckgl.ipynb)

In [1]:
required_packages = {"mwparserfromhell","pysolr","boltons"}

import pkg_resources
for lib in required_packages - {pkg.key for pkg in pkg_resources.working_set}:
    print(f"installing {lib}")
    %pip install -q --upgrade pip
    %pip install -q $lib
    pkg_resources.require(lib)


installing pysolr
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
installing mwparserfromhell
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
installing boltons
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


In [2]:
solr_host = '192.168.12.110'
solr_collection = 'wiki'

In [4]:
import pysolr
solr = pysolr.Solr(f'http://{solr_host}:8983/solr/{solr_collection}', always_commit=True)
solr.ping()

'{\n  "responseHeader":{\n    "zkConnected":null,\n    "status":0,\n    "QTime":2,\n    "params":{\n      "q":"{!lucene}*:*",\n      "distrib":"false",\n      "df":"_text_",\n      "rows":"10",\n      "echoParams":"all",\n      "rid":"-1"}},\n  "status":"OK"}\n'

In [2]:
if not "spark" in locals():
    import pyspark
    MAX_MEMORY = "8g"  # 24 gives OOM here. # 6 gives "out of heap space"
    spark = (pyspark.sql.SparkSession.builder.appName("MyApp") 
        .config("spark.jars.packages", "io.delta:delta-core_2.12:0.8.0") 
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") 
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") 
        .config("spark.executor.memory", MAX_MEMORY) 
        .config("spark.driver.memory", MAX_MEMORY) 
        .config("spark.python.worker.reuse",False)
        .config("spark.task.maxFailures",5)
        .enableHiveSupport() 
        .getOrCreate()        
        )
spark

In [3]:
spark.sql('select count(*),count(distinct title) from  wikipedia_silver_structured_templates').collect()

[Row(count(1)=21108360, count(DISTINCT title)=21108326)]

In [5]:
spark.sql('select * from  wikipedia_silver_structured_templates limit 2').printSchema()

root
 |-- title: string (nullable = true)
 |-- body: string (nullable = true)
 |-- infoboxes: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- params: map (nullable = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: string (valueContainsNull = true)
 |    |    |-- body: string (nullable = true)
 |-- templates: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- params: map (nullable = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: string (valueContainsNull = true)
 |    |    |-- body: string (nullable = true)
 |-- extlinks: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- title: string (nullable = true)
 |    |    |-- url: string (nullable = true)
 |-- wikilinks: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- text

In [11]:
spark.sql('select title,infoboxes from  wikipedia_silver_structured_templates limit 20').show(20,50)

+--------------------------------------------------+--------------------------------------------------+
|                                             title|                                         infoboxes|
+--------------------------------------------------+--------------------------------------------------+
|                                  Jessica Cattaneo|[{Infobox swimmer, {name -> Jessica Cattaneo, i...|
|                         Category:Monza Rally Show|                                                []|
|                                    Royal fortress|                                                []|
|                                       Erica Musso|[{Infobox swimmer, {name -> Erica Musso, image ...|
|                             Wallace Edwin Sturgis|                                                []|
|                       Prêmio Jabuti de Literatura|                                                []|
|                                          Totbrief|            

In [154]:
solr_url = f'http://{solr_host}:8983/solr/{solr_collection}'
from boltons.iterutils import remap
import datetime

class SolrForeachWriter:
    def open(self, partition_id, epoch_id):
        self._partition_id = partition_id
        self._epoch_id     = epoch_id
        self._batch_size   = 10000
        self._solr_url     = solr_url
        self.pending_docs  = []
        return True
        
    def insert_pending_docs(self):
        try:
            if self.pending_docs == []: return
            solr = pysolr.Solr(self._solr_url)
            solr.add(self.pending_docs)
            solr.commit()
            self.pending_docs = []
        except Exception as e:
            errmsg = f"{str(e)} adding {self.pending_docs}"
            raise
            
    def process(self,row):
        def map_to_solr_types(path,key,value):
            if value is None:
                return False
            if isinstance(value,datetime.datetime):
                return key,value.isoformat()+"Z"
            return key,value
        solr_data = remap(row.asDict(),visit=map_to_solr_types)
        self.pending_docs.append(solr_data)
        if len(self.pending_docs) >= self._batch_size:
            self.insert_pending_docs()
        return True
    
    def close(self,error):
        self.insert_pending_docs()
        return True

In [155]:
# spark.sql("""DROP TABLE IF EXISTS wikipedia_for_solr""")
# spark.sql("""
#           CREATE TABLE IF NOT EXISTS wikipedia_for_solr (
#               id           string,
#               title_txt_en string,
#               body_txt_en  array<string>,
#               person_s     array<string>,
#               h3_1_s       array<string>,
#               h3_2_s       array<string>,
#               tags         array<string>
#           ) USING DELTA
# """)
###  gives errors like
###     AnalysisException: Table default.wikipedia_for_solr does not support either micro-batch or continuous scan.;
###  when used as a streaming source.

df = spark.createDataFrame([],"""
              id           string,
              title_txt_en string,
              body_txt_en  array<string>,
              person_s     array<string>,
              h3_1_s       array<string>,
              h3_2_s       array<string>,
              tags         array<string>
""")
(df.write
   .format('delta')
   .mode('overwrite')
   .option("mergeSchema", "true")
   .save('./tables/try_2_wikipedia_for_solr')
)


Cute workaround for MERGE requiring matching columns...

*  https://mungingdata.com/pyspark/union-unionbyname-merge-dataframes/

In [198]:
new_rows = spark.sql("""
  select base64(sha2(title, 224)) as id,
  title as title_txt_en,
  array('') as person_s
  from  wikipedia_silver_structured_templates
  where lower(title) like '%dog%' limit 1000000
""")
new_rows = spark.sql('select * from wikipedia_for_solr limit 0').unionByName(new_rows,allowMissingColumns=True)
new_rows.show()

+--------------------+--------------------+-----------+--------+------+------+----+
|                  id|        title_txt_en|body_txt_en|person_s|h3_1_s|h3_2_s|tags|
+--------------------+--------------------+-----------+--------+------+------+----+
|0cDExrekUzzUnblPC...|            True dog|       null|      []|  null|  null|null|
|zXtrS7/VTZPHXnqvL...|2010–11 Minnesota...|       null|      []|  null|  null|null|
|pJafRXTBkEaAhZS7l...|2019-20 Minnesota...|       null|      []|  null|  null|null|
|EKfAaB6845wlWPQ9E...|File:The Dog Star...|       null|      []|  null|  null|null|
|50ndknKe8lgvu2bNN...|Category:Wingate ...|       null|      []|  null|  null|null|
|xsohacu8rfUbslR7O...|Category:Gardner–...|       null|      []|  null|  null|null|
|0Y2kKLJ4NZpkLBGJv...|Category:Louisian...|       null|      []|  null|  null|null|
|ZYpIfJZ4cs/Rs7/IV...|Category:Alabama ...|       null|      []|  null|  null|null|
|7cz83066VzyfR4LsN...|2003–04 Butler Bu...|       null|      []|  null|  nul

In [205]:
(new_rows.write
   .format('delta')
   .mode('overwrite')
   .option("mergeSchema", "true")
   .save('/tmp/new_rows')
)

In [209]:
for active_stream in spark.streams.active:
    print(f"Stream: {active_stream.name} - {active_stream.explain()}")
    print(dir(active_stream))
    #active_stream.stop()
    

In [207]:
stream2 = spark.readStream.format("delta").option("ignoreChanges",True).load("/tmp/new_rows")
stream2.isStreaming

True

In [210]:
sq = stream2.writeStream \
  .trigger(once=True) \
  .foreach(SolrForeachWriter()) \
  .option('checkpointLocation','/tmp/new_rows_checkpoint3') \
  .outputMode("update") \
  .start()

#### workaround this: 

* https://github.com/delta-io/delta/issues/594
* https://stackoverflow.com/questions/66106096/delta-lake-insert-into-sql-in-pyspark-is-failing-with-java-lang-nosuchmethoder


In [219]:
solr.search(
    **{'q':'title_txt_en:cat','fl':'title_txt_en'}
           ).raw_response



{'responseHeader': {'status': 0,
  'QTime': 0,
  'params': {'q': 'title_txt_en:cat', 'fl': 'title_txt_en', 'wt': 'json'}},
 'response': {'numFound': 536,
  'start': 0,
  'numFoundExact': True,
  'docs': [{'title_txt_en': 'Cats'},
   {'title_txt_en': 'Black cat, White cat'},
   {'title_txt_en': 'Cat Roberts'},
   {'title_txt_en': 'Cats and the Fiddle'},
   {'title_txt_en': 'Cunning Cat'},
   {'title_txt_en': 'Gypsy  &  the Cat'},
   {'title_txt_en': 'Cat Doucet'},
   {'title_txt_en': 'Oscar the cat'},
   {'title_txt_en': 'Cat Boyd'},
   {'title_txt_en': "The Cat's Bah"}]}}

In [194]:
solr.delete(q='*:*')

'<?xml version="1.0" encoding="UTF-8"?>\n<response>\n\n<lst name="responseHeader">\n  <int name="status">0</int>\n  <int name="QTime">120</int>\n</lst>\n</response>\n'

## Merge into is broken with Solr 3.1.1 and Delta.io 0.8

In [125]:
new_rows.createOrReplaceTempView("new_rows")
buggy_spark = True
if not buggy_spark:
    spark.sql("""
        create table tmp_new_rows_tbl using delta as select * from new_rows
    """)
    spark.sql("""
        MERGE INTO delta.`tables/try_2_wikipedia_for_solr`
        USING tmp_new_rows_tbl
          ON new_rows.id = wikipedia_for_solr.id
        WHEN MATCHED THEN UPDATE SET *
        WHEN NOT MATCHED THEN INSERT *
    """)
else:
    spark.sql("""
        drop table if exists tmp_new_rows_tbl
    """)
    spark.sql("""
        DELETE FROM wikipedia_for_solr 
    """)
    spark.sql("""
        INSERT INTO delta.`tables/try_2_wikipedia_for_solr`
        SELECT * From new_rows
    """)

AnalysisException: Table not found: delta.tables/try_2_wikipedia_for_solr;
'InsertIntoStatement 'UnresolvedRelation [delta, tables/try_2_wikipedia_for_solr], [], false, false, false
+- Project [id#20292, title_txt_en#20293, body_txt_en#20294, person_s#20295, h3_1_s#20296, h3_2_s#20297, tags#20298]
   +- SubqueryAlias new_rows
      +- Union false, false
         :- GlobalLimit 0
         :  +- LocalLimit 0
         :     +- Project [id#20292, title_txt_en#20293, body_txt_en#20294, person_s#20295, h3_1_s#20296, h3_2_s#20297, tags#20298]
         :        +- SubqueryAlias spark_catalog.default.wikipedia_for_solr
         :           +- Relation[id#20292,title_txt_en#20293,body_txt_en#20294,person_s#20295,h3_1_s#20296,h3_2_s#20297,tags#20298] parquet
         +- Project [id#20278, title_txt_en#20279, null AS body_txt_en#20306, person_s#20280, null AS h3_1_s#20307, null AS h3_2_s#20308, null AS tags#20309]
            +- GlobalLimit 2
               +- LocalLimit 2
                  +- Project [base64(cast(sha2(cast(title#20281 as binary), 224) as binary)) AS id#20278, title#20281 AS title_txt_en#20279, array(George Washington) AS person_s#20280]
                     +- SubqueryAlias spark_catalog.default.wikipedia_silver_structured_templates
                        +- Relation[title#20281,body#20282,infoboxes#20283,templates#20284,extlinks#20285,wikilinks#20286,coords#20287,_error_#20288] parquet


In [113]:
df = spark.readStream.format("delta").option("ignoreChanges",True).table("wikipedia_for_solr")
df = spark.readStream.format("delta").option("ignoreChanges",True).load('./tables/try_2_wikipedia_for_solr')


AnalysisException: Table default.wikipedia_for_solr does not support either micro-batch or continuous scan.;
SubqueryAlias spark_catalog.default.wikipedia_for_solr
+- StreamingRelationV2 default.wikipedia_for_solr, DeltaTableV2(org.apache.spark.sql.SparkSession@5c128f74,file:/home/jovyan/work/wikipedia_in_spark/notebooks/spark-warehouse/wikipedia_for_solr,Some(CatalogTable(
Database: default
Table: wikipedia_for_solr
Owner: jovyan
Created Time: Mon May 24 04:10:12 UTC 2021
Last Access: UNKNOWN
Created By: Spark 3.1.1
Type: MANAGED
Provider: delta
Location: file:/home/jovyan/work/wikipedia_in_spark/notebooks/spark-warehouse/wikipedia_for_solr
Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat: org.apache.hadoop.mapred.SequenceFileInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
Partition Provider: Catalog)),Some(default.wikipedia_for_solr),None,org.apache.spark.sql.util.CaseInsensitiveStringMap@1f), [ignoreChanges=true], [id#19790, title_txt_en#19791, body_txt_en#19792, person_s#19793, h3_1_s#19794, h3_2_s#19795, tags#19796], org.apache.spark.sql.delta.catalog.DeltaCatalog@450f4666, default.wikipedia_for_solr


In [101]:
df.isStreaming

True

In [89]:
import json
data = spark.sql("""
  SELECT
   sha2('Spark',256),
   sha2('Spark',224),
   md5('Spark'),
   base64(sha2('Spark', 224))
   ;
""").take(1)
print(json.dumps(data,indent=2))

[
  [
    "529bc3b07127ecb7e53a4dcf1991d9152c24537d919178022b2c42657f79a26b",
    "\ufffd\ufffdIqg\ufffd6\ufffd!\ufffd\ufffd\u001c\u000ft\ufffdwZ*|`\u0007=b\ufffd\u0004T\ufffd",
    "8cde774d6f7333752ed72cacddb05126",
    "2+q5SXFnjTavIZWFHA90hXdaKnxgBz1i/ARUnA=="
  ]
]
